# JSON tokenizer & JSONL parser 

## Tokenizer

In [3]:

class Token:
    #Represents a JSON token with type, value, and position.
    def __init__(self, t, v, pos):
        self.type, self.value, self.pos = t, v, pos

class Tokenizer:
    ws = set(" \t\r\n")
    def __init__(self, text):
        self.text, self.n, self.i = text, len(text), 0

    def peek(self):
        return self.text[self.i] if self.i < self.n else ''

    def next(self):
        ch = self.peek()
        self.i += 1
        return ch

    def skip_ws(self):
        while self.i < self.n and self.text[self.i] in self.ws:
            self.i += 1

    def read_str(self, start):
        self.next()  # skip opening quote
        out = []
        while True:
            if self.i >= self.n:
                raise SyntaxError(f"String not closed (from {start})")
            ch = self.next()
            if ch == '"':
                break
            if ch == '\\':
                if self.i >= self.n:
                    raise SyntaxError("Bad escape sequence")
                esc = self.next()
                m = {'"':'"', '\\':'\\', '/':'/', 'b':'\b', 'f':'\f', 'n':'\n', 'r':'\r', 't':'\t'}
                if esc in m:
                    out.append(m[esc])
                else:
                    raise SyntaxError(f"Unknown escape \\{esc}")
            else:
                out.append(ch)
        return ''.join(out)

    def read_num(self, start):
        j = self.i
        if self.peek() == '-':
            self.next()
        if self.peek() == '0':
            self.next()
        else:
            if not self.peek().isdigit():
                raise SyntaxError(f"Bad number at {start}")
            while self.peek().isdigit():
                self.next()
        if self.peek() == '.':
            self.next()
            if not self.peek().isdigit():
                raise SyntaxError("Bad decimal")
            while self.peek().isdigit():
                self.next()
        s = self.text[j:self.i]
        return float(s) if '.' in s else int(s)

    def read_kw(self, start): #read keyword:true, false,null 
        for k, v in [("true", True), ("false", False), ("null", None)]:
            if self.text.startswith(k, self.i):
                self.i += len(k)
                return v
        raise SyntaxError(f"Unknown literal near {start}")

    def tokens(self):
        while True:
            self.skip_ws()
            if self.i >= self.n:
                yield Token("EOF", None, self.i) #end of file
                return
            ch = self.peek()
            pos = self.i
            if ch in '{}[]:,':
                self.next()
                yield Token(ch, ch, pos)
            elif ch == '"':
                yield Token("STR", self.read_str(pos), pos)#string
            elif ch in '-0123456789':
                yield Token("NUM", self.read_num(pos), pos)#number
            else:
                yield Token("KW", self.read_kw(pos), pos)#keyword


## Parser

In [4]:
#focus on the relationship between tokens
class Stream:
    def __init__(self, tokenizer):
        self.gen, self.buf = tokenizer.tokens(), []

    def peek(self):
        if not self.buf:
            self.buf.append(next(self.gen))
        return self.buf[0]

    def next(self):
        if self.buf:
            return self.buf.pop(0)
        return next(self.gen)

    def expect(self, t):
        tok = self.next()
        if tok.type != t:
            raise SyntaxError(f"Expect {t} at {tok.pos}, got {tok.type}")
        return tok

class Parser:
    def parse(self, text):
        ts = Stream(Tokenizer(text))
        val = self.value(ts)
        if ts.peek().type != "EOF":
            raise SyntaxError("Extra content after JSON")
        return val

    def value(self, ts):
        t = ts.peek()
        if t.type == '{': return self.obj(ts)
        if t.type == '[': return self.arr(ts)
        if t.type == 'STR': return ts.next().value
        if t.type == 'NUM': return ts.next().value
        if t.type == 'KW':  return ts.next().value
        raise SyntaxError(f"Unexpected token {t.type}")

    def obj(self, ts):
        ts.expect('{')
        o = {}
        if ts.peek().type == '}':
            ts.next()
            return o
        while True:
            k = ts.expect("STR").value
            ts.expect(':')
            o[k] = self.value(ts)
            t = ts.peek()
            if t.type == ',':
                ts.next()
            elif t.type == '}':
                ts.next(); break
            else:
                raise SyntaxError(f"Unexpected {t.type}")
        return o

    def arr(self, ts):
        ts.expect('[')
        a = []
        if ts.peek().type == ']':
            ts.next()
            return a
        while True:
            a.append(self.value(ts))
            t = ts.peek()
            if t.type == ',':
                ts.next()
            elif t.type == ']':
                ts.next(); break
            else:
                raise SyntaxError(f"Unexpected {t.type}")
        return a


## Collection

In [8]:
class Collection:

    def __init__(self, data):
        self.data = data if isinstance(data, list) else [data]#ensure that data is a list or turned to be a list
    
    def _extract_key(self, doc, key):
        """supports dot notation"""
        ks = key.split(".")
        cur = doc
        for k in ks:
            if not isinstance(cur, dict) or k not in cur:
                return None
            cur = cur[k]
        return cur

    def find(self, query=None):#query like{"attitude_count":500,"likes":2}
        
        if query is None:#find all data
            return self.data

        def match(doc, query): #to check if doc fits query
            for key, value in query.items():
                cur = self._extract_key(doc, key)  
                if cur != value:
                    return False
            return True

        return [doc for doc in self.data if match(doc, query)]

    def project(self, fields):

    #Return documents with only selected fields.
    #Example: fields = ["user", "text"]

        result = []
        for doc in self.data:
            projected = {}
            for field in fields:
                #use _extract_key to process nested key
                projected[field] = self._extract_key(doc, field)
            result.append(projected)
        return result

    def groupby(self, key):
        groups = {}
        for doc in self.data:
            group_value = self._extract_key(doc, key) 
            groups.setdefault(group_value, []).append(doc)
        return groups

    def aggregate(self, group_key, agg_func):
        #Apply an aggregation function (sum, count, avg, etc.) on each group.
        grouped = self.groupby(group_key)
        result = {}
        for k, docs in grouped.items():
            result[k] = agg_func(docs)
        return result


    def hash_join(self, other, key_self, key_other, join_type="inner"):
        """
        join_type: inner / left / right / full
        """

        # Build hash map for other
        hashmap = {}
        for doc in other.data:
            val = self._extract_key(doc, key_other)
            hashmap.setdefault(val, []).append(doc)

        result = []
        # Process left side (self)
        matched_right_keys = set()

        for doc_left in self.data:
            val_left = self._extract_key(doc_left, key_self)
            if val_left in hashmap:
                for doc_right in hashmap[val_left]:
                    matched_right_keys.add(id(doc_right))
                    result.append({
                        "left": doc_left,
                        "right": doc_right
                    })
            else:
                if join_type in ("left", "full"):
                    result.append({
                        "left": doc_left,
                        "right": None
                    })

        # Process unmatched right side (right join or full join)
        if join_type in ("right", "full"):
            for doc_right in other.data:
                if id(doc_right) not in matched_right_keys:
                    result.append({
                        "left": None,
                        "right": doc_right
                    })

        return result
    def pipeline(self, query=None, project_fields=None,
                 group_key=None, agg_func=None,
                 join_collection=None, join_self_key=None, 
                 join_other_key=None, join_type="inner"):
        data = self.data

        if query:
            data = Collection(data).find(query)

        if project_fields:
            data = Collection(data).project(project_fields)

        if group_key and agg_func:
            data = Collection(data).aggregate(group_key, agg_func)

        if join_collection:
            data = Collection(data).hash_join(
                join_collection,
                join_self_key,
                join_other_key,
                join_type
            )

        return data


## aggregate function

In [9]:

def agg_count(field=None):
    return lambda docs: len(docs)

def agg_sum(field):
    return lambda docs: sum(
        doc.get(field, 0) for doc in docs
        if isinstance(doc.get(field), (int, float))
    )

def agg_max(field):
    return lambda docs: max(
        doc.get(field) for doc in docs
        if isinstance(doc.get(field), (int, float))
    )

def agg_min(field):
    return lambda docs: min(
        doc.get(field) for doc in docs
        if isinstance(doc.get(field), (int, float))
    )

def agg_avg(field):
    return lambda docs: (
        sum(doc.get(field, 0) for doc in docs
            if isinstance(doc.get(field), (int, float)))
        / len(docs)
        if docs else None
    )


## chunk processing

In [10]:
def load_json_chunks(path, chunk_size=5000):
    """
    Generic loader:
        - if JSONL:  one JSON object per line
        - if JSON array: [ {...}, {...} ]
    """
    with open(path, "r", encoding="utf-8") as f:
        first_char = f.read(1)#to identify whether it is a josn or a jsonl
        f.seek(0)#go back to the begining 

        if first_char == "[":  # JSON array
            text = f.read()
            parser = Parser()
            arr = parser.parse(text)
            for i in range(0, len(arr), chunk_size):
                yield arr[i:i + chunk_size]
        else: # JSONL
            parser = Parser()
            buffer = []
            for line in f:
                line = line.strip()
                if not line:
                    continue
                buffer.append(parser.parse(line))
                if len(buffer) >= chunk_size:
                    yield buffer
                    buffer = []
            if buffer:
                yield buffer


## merge partial results

In [11]:
class PartialAgg:
    """merge of partial aggregation results"""

    @staticmethod
    def merge_count(v1, v2):
        return v1 + v2

    @staticmethod
    def merge_sum(v1, v2):
        return v1 + v2

    @staticmethod
    def merge_max(v1, v2):
        return max(v1, v2)

    @staticmethod
    def merge_min(v1, v2):
        return min(v1, v2)

    @staticmethod
    def merge_avg(avg1, count1, avg2, count2):
        # weighted average
        total = count1 + count2
        return (avg1 * count1 + avg2 * count2) / total, total


## chunk processing

In [12]:
def calculate_average_engagement_by_location(filepath, chunk_size=5000):
    """
    Calculates the Average Engagement Rate (AER) grouped by IP location 
    for large datasets using chunked processing and partial aggregation merging.
    This demonstrates the project's scaling requirement.
    AER = (Total Reposts + Total Comments + Total Attitudes) / Total Posts
    """
    
    # 1. initialize four global partial result containers
    # Dictionaries to store merged partial aggregation results globally
    partial_counts = {}
    partial_reposts_sums = {}
    partial_comments_sums = {}
    partial_attitudes_sums = {}
    
    # 2. process the file chunk by chunk
    for chunk in load_json_chunks(filepath, chunk_size):
        coll = Collection(chunk)
    
        # local aggregation calculations (Grouped by "ip_location")
        chunk_counts = coll.aggregate("ip_location", agg_count())
        chunk_reposts = coll.aggregate("ip_location", agg_sum("reposts_count"))
        chunk_comments = coll.aggregate("ip_location", agg_sum("comments_count"))
        chunk_attitudes = coll.aggregate("ip_location", agg_sum("attitudes_count"))
        
        # 3. merge Local Results
        
        # Merge Counts (Total Posts)
        for loc, count in chunk_counts.items():
            current_count = partial_counts.get(loc, 0)
            # use PartialAgg.merge_count to combine current global total with local chunk total
            partial_counts[loc] = PartialAgg.merge_count(current_count, count)
            
        # Merge Reposts Sums
        for loc, total in chunk_reposts.items():
            current_total = partial_reposts_sums.get(loc, 0)
            # Use PartialAgg.merge_sum for addition
            partial_reposts_sums[loc] = PartialAgg.merge_sum(current_total, total)
            
        # Merge Comments Sums
        for loc, total in chunk_comments.items():
            current_total = partial_comments_sums.get(loc, 0)
            partial_comments_sums[loc] = PartialAgg.merge_sum(current_total, total)
            
        # Merge Attitudes (Likes) Sums
        for loc, total in chunk_attitudes.items():
            current_total = partial_attitudes_sums.get(loc, 0)
            partial_attitudes_sums[loc] = PartialAgg.merge_sum(current_total, total)
            
    # 4. Calculate Final Average Engagement Rate (Final Calculation)
    final_results = {}
    for loc in partial_counts:
        # Get all global sums
        total_interactions = (
            partial_reposts_sums.get(loc, 0) +
            partial_comments_sums.get(loc, 0) +
            partial_attitudes_sums.get(loc, 0)
        )
        total_posts = partial_counts[loc]
        
        # Calculate Average Engagement Rate, prevent division by zero
        avg_engagement_rate = total_interactions / total_posts if total_posts else 0
        
        final_results[loc] = {
            "Total_Posts": total_posts,
            "Avg_Engagement_Rate": avg_engagement_rate
        }
    
    return final_results