In [81]:
import sys

# Transformation library:
from pyspark.sql import SparkSession
import pyspark.sql.functions as sqlF
from pyspark.sql.types import \
    StructType, StructField, \
    StringType, LongType, \
    DateType, IntegerType

In [82]:
# JVM X-tra arguments
jvmXtraOpts = [
"-XX:+UnlockDiagnosticVMOptions",
# "-XX:+PrintGCDetails",
# "-XX:InitiatingHeapOccupancyPercent=35",
"-XX:MaxGCPauseMillis=120000",
# "-XX:+UseStringDeduplication",


# # Serial GC:
# "-XX:+UseSerialGC",

# G1 Garbage Collector:
"-XX:+UseG1GC",
"-XX:G1HeapRegionSize=16M",

# # Parallel GC:
# "-XX:+UseParallelGC",
# "-XX:ParallelGCThreads=8",

]
# Initialize Spark Session:
spark = SparkSession.builder \
        .master("local[8]") \
        .appName("Challenge_1") \
        .config("spark.executor.memory", "4g") \
        .config("spark.driver.memory", "11g") \
        .config("spark.driver.extraJavaOptions", ' '.join(jvmXtraOpts)) \
        .getOrCreate()

In [83]:
# Define schema of the data:
# Date,Parent org,Brand,Product Id,Product View Count

viewsRawSchema = StructType(fields=[
    StructField("date", StringType(), nullable = True),
    StructField("parent_org", StringType(), nullable = True),
    StructField("brand", StringType(), nullable = True),
    StructField("product_id", StringType(), nullable = True),
    StructField("product_view_count", StringType(), nullable = True)
])

viewsSchema = StructType(fields=[
    StructField("date", DateType(), nullable = True),
    StructField("parent_org", StringType(), nullable = True),
    StructField("brand", StringType(), nullable = True),
    StructField("product_id", StringType(), nullable = True),
    StructField("product_view_count", LongType(), nullable = True)
])

In [84]:
# Load the data:
path = './q1_data.csv'
viewsDF = spark.read \
        .format('csv') \
        .schema(viewsRawSchema) \
        .option("header",True) \
        .option("enforceSchema",True) \
        .option("sep",",") \
        .option("lineSep","\r\n") \
        .option("quote",'"') \
        .option("dateFormat","yyyy-MM-dd") \
        .option("path", path) \
        .load() \
        .cache() # Load in memory

viewsDF.show(5)

+----------+--------------------+--------------------+-------------+------------------+
|      date|          parent_org|               brand|   product_id|product_view_count|
+----------+--------------------+--------------------+-------------+------------------+
|2024-06-25|          loreal_org| lorealprofessionnel|  12378088373|                 2|
|2024-05-20|colgatepalmolive_org|           palmolive|8901314508324|                 1|
|2024-05-12|       mamaearth_org|           mamaearth|8906087776758|                20|
|2024-06-24|          loreal_org| lorealprofessionnel|3474630587472|                 5|
|2024-05-11|             itc_org|itcmasterchefcook...|8901725113148|                 1|
+----------+--------------------+--------------------+-------------+------------------+
only showing top 5 rows



24/07/06 12:30:25 WARN CacheManager: Asked to cache already cached data.


In [94]:
# Print dict() Tree-Structure
# NOTE:
    # ctx: {True:list, False:dict}
def treeShow(data, ctx = True, indent = ''):
    typeFlag = (isinstance(data,list) << 0) + (isinstance(data,dict) << 1)

    if typeFlag == 3:
        print('FML')
    
    if typeFlag == 0 or len(data) == 0:
        if isinstance(data, str):
            data = f'"{data}"'
        print(data)
        return

    # if flag == 1:
    #     data = {pos:e for pos,e in enumerate(data)}

    keyPath = None
    ordFlag = False
    if typeFlag == 2:
        # Assumed meta-notation for ordering dict()
            # {"__ord__": {"path": list[key, ...], "flag": bool}
        ord = data.pop("__ord__", None)

        if not ord is None:
            keyPath = ord["path"]
            ordFlag = ord["flag"]
            assert isinstance(keyPath, list)

    items = list(data.items() if typeFlag==2 else enumerate(data))

    # Identify element:
    find = (
        lambda X, path, pos = 0: \
        X if pos == len(path) \
        else find(X[path[pos]],path, pos + 1)
    )

    # Order the items before printing:
    items.sort(
        key = lambda X: \
            X[0] if keyPath is None 
            else find(X[1], keyPath)
    )
    if ordFlag:
        items.reverse()

    if len(items) == 1 and ctx:
        k,v = items[0]
        print('─── ' + (f"{k} : " if typeFlag==2 else ""), end='')
        treeShow(v, typeFlag == 1, indent + '    ')
        return

    for i, (k, v) in enumerate(items):
        offset = ""
        
        if ctx:
            if i!=0:
                offset = indent

            if i == 0:
                offset += '┌── '
            elif i==len(data)-1:
                offset += '└── '
            else:
                offset += '├── '
        else:
            if i == 0:
                offset = '\n'
            offset += indent

            if i == len(data)-1:
                offset += '└── '
            else:
                offset += '├── '

        print(
            offset + 
            (f"""{
                '"' + k + '"' if isinstance(k, str) else k
            }: """ if typeFlag==2 else ""), 
            end=''
        )

        treeShow(
            v, 
            typeFlag == 1, 
            indent + 
            ('│   ' if i!=len(data)-1 else '    ')
        )

    return

# Test:
treeShow([1,[{2:1,4:'a'}],2,{'a':[],'b':{1:{2:'adfa'},2:['ajtryyfa']}},None,[[],[]],[3,4,[5,6,{'a':{1:2,3:4},'b':[2,6,7],'c':'c'},8,9],7,8],{1:'as',2:'adf',3:'adsfs'},[[8]]])

┌── 1
├── ─── ┌── 2: 1
│       └── 4: "a"
├── 2
├── ┌── "a": []
│   └── "b": 
│       ├── 1: 
│       │   └── 2: "adfa"
│       └── 2: 
│           └── "ajtryyfa"
├── None
├── ┌── []
│   └── []
├── ┌── 3
│   ├── 4
│   ├── ┌── 5
│   │   ├── 6
│   │   ├── ┌── "a": 
│   │   │   │   ├── 1: 2
│   │   │   │   └── 3: 4
│   │   │   ├── "b": 
│   │   │   │   ├── 2
│   │   │   │   ├── 6
│   │   │   │   └── 7
│   │   │   └── "c": "c"
│   │   ├── 8
│   │   └── 9
│   ├── 7
│   └── 8
├── ┌── 1: "as"
│   ├── 2: "adf"
│   └── 3: "adsfs"
└── ─── ─── 8


In [86]:
# Column cardinalities:
def descCols(df):
    total_records = df.count()
    res = {}
    
    for column in df.columns:
        # Calculate the cardinality of the column
        X = df.where(sqlF.col(column).isNotNull()).select(column).cache()
        cardinality = X.distinct().count()
        nulls = total_records - X.count()
        
        res[column] = {'nulls': nulls, 'cardinality': cardinality}\

        X.unpersist()

    res = {"columns": res}
    res["rows"] = total_records
    return res

# Column frequency distributions:
def colValues(df, cols=None):
    cols = df.columns if cols is None else cols
    res = {}

    for col in cols:
        # Get the value counts for the column
        valCounts = df.groupBy(df[col]).agg(sqlF.count("*").alias("count")).orderBy(sqlF.col("count").desc())
        
        # Collect the value counts into a dictionary
        res[col] = {row[col]: row["count"] for row in valCounts.collect()}
    
    return res

In [87]:
# Exploration
dimensions = descCols(viewsDF)
treeShow(dimensions)
# As cardinality of columns is low, we can store them in local memory

┌── "columns": 
│   ├── "brand": 
│   │   ├── "cardinality": 72
│   │   └── "nulls": 0
│   ├── "date": 
│   │   ├── "cardinality": 31
│   │   └── "nulls": 0
│   ├── "parent_org": 
│   │   ├── "cardinality": 16
│   │   └── "nulls": 0
│   ├── "product_id": 
│   │   ├── "cardinality": 780
│   │   └── "nulls": 0
│   └── "product_view_count": 
│       ├── "cardinality": 873
│       └── "nulls": 0
└── "rows": 7543


In [88]:
# Exploration
limit = 100
categories = colValues(
    viewsDF,
    cols = [K for K, V in dimensions['columns'].items() if V['cardinality'] < limit]
)
treeShow(categories)

┌── "brand": 
│   ├── "aashirvaad": 45
│   ├── "aashirvaadinstantmeals": 10
│   ├── "aashirvaadinstantmixes": 13
│   ├── "aashirvaadmultigrain": 81
│   ├── "aashirvaadnaturessuperfoods": 187
│   ├── "aashirvaadnaturessuperfoodsorganic": 15
│   ├── "aashirvaadselect": 44
│   ├── "asknestle": 18
│   ├── "axe": 10
│   ├── "boost": 147
│   ├── "bru": 59
│   ├── "cadburybournville": 247
│   ├── "cadburyfuse": 48
│   ├── "ceregrow": 47
│   ├── "cheryls": 56
│   ├── "closeup": 48
│   ├── "colgate": 218
│   ├── "comfort": 40
│   ├── "cornetto": 84
│   ├── "dove": 110
│   ├── "everyday": 2
│   ├── "ezee": 105
│   ├── "fiama": 2
│   ├── "garnier": 310
│   ├── "genteel": 36
│   ├── "goodknight": 51
│   ├── "hersheys": 29
│   ├── "hersheysexoticdark": 95
│   ├── "hit": 310
│   ├── "horlicks": 310
│   ├── "huggies": 280
│   ├── "itcmasterchefbasegravies": 16
│   ├── "itcmasterchefcookingpastes": 36
│   ├── "kerastase": 1
│   ├── "kissan": 120
│   ├── "kisses": 102
│   ├── "knorr": 140
│   ├── "kwal

In [89]:
# Assertions:    

# 'date' is valid:
print(
    "Invalid rows: ",
    viewsDF.select('date') \
        .where(sqlF.col('date').isNotNull()) \
        .withColumn(
            'date',
            sqlF.col('date').cast(DateType())
        ).where(sqlF.col('date').isNull()).count()
)

# 'product_view_count' is valid:
print(
    "Invalid rows: ",
    viewsDF.select('product_view_count') \
        .where(sqlF.col('product_view_count').isNotNull()) \
        .withColumn(
            'product_view_count',
            # NOTE: '.cast(LongType())' approximates fractional part. eg 89.123 -> 89
            sqlF.col('product_view_count').cast(LongType())
        ).where(sqlF.col('product_view_count').isNull()).count()
)

Invalid rows:  246
Invalid rows:  0


In [90]:
# Transformations:

# Transform schema:
for rawField, validField  in zip(viewsDF.schema.fields,viewsSchema.fields):
    viewsDF = viewsDF.withColumn(
        validField.name, 
        sqlF.col(rawField.name).cast(validField.dataType)
    )

validViewsDF = viewsDF.select([f.name for f in viewsSchema.fields]).cache()

validViewsDF.show(5)
viewsDF.unpersist()

+----------+--------------------+--------------------+-------------+------------------+
|      date|          parent_org|               brand|   product_id|product_view_count|
+----------+--------------------+--------------------+-------------+------------------+
|2024-06-25|          loreal_org| lorealprofessionnel|  12378088373|                 2|
|2024-05-20|colgatepalmolive_org|           palmolive|8901314508324|                 1|
|2024-05-12|       mamaearth_org|           mamaearth|8906087776758|                20|
|2024-06-24|          loreal_org| lorealprofessionnel|3474630587472|                 5|
|2024-05-11|             itc_org|itcmasterchefcook...|8901725113148|                 1|
+----------+--------------------+--------------------+-------------+------------------+
only showing top 5 rows



24/07/06 12:30:27 WARN CacheManager: Asked to cache already cached data.


DataFrame[date: date, parent_org: string, brand: string, product_id: string, product_view_count: bigint]

In [91]:
result = validViewsDF.groupby('parent_org','brand') \
    .agg(sqlF.sum('product_view_count').alias('cumulative_count'))

print("Rows in result:",result.count())

Rows in result: 75


In [92]:
# Since 'result' has only 75, we process locally in sequential manner:

output = { "__ord__": {"path":["org_views"], "flag": True} }
for row in result.collect():
    org = row['parent_org']
    brand = row['brand']
    views = row['cumulative_count']

    if output.get(org,None) is None:
        output[org] = {
            'org_views': views, 
            'brand_views': {
                brand: views,
                "__ord__": {"path":[], "flag": True}
            }
        }
    else:
        output[org]['org_views'] = output[org]['org_views'] + views
        output[org]['brand_views'][brand] = output[org]['brand_views'].get(brand,0) + views

# Sort the output:
treeShow(output)

┌── "loreal_org": 
│   ├── "brand_views": 
│   │   ├── "lorealparis": 721216
│   │   ├── "maybelline": 92812
│   │   ├── "garnier": 81104
│   │   ├── "cheryls": 11993
│   │   ├── "lorealprofessionnel": 9420
│   │   ├── "matrix": 1445
│   │   └── "kerastase": 1
│   └── "org_views": 917991
├── "unilever_org": 
│   ├── "brand_views": 
│   │   ├── "horlicks": 141732
│   │   ├── "lipton": 97514
│   │   ├── "dove": 75977
│   │   ├── "kwalitywalls": 74739
│   │   ├── "vaseline": 52472
│   │   ├── "redlabel": 38176
│   │   ├── "ponds": 29368
│   │   ├── "cornetto": 18261
│   │   ├── "boost": 9836
│   │   ├── "surfexcel": 8266
│   │   ├── "sunsilk": 7069
│   │   ├── "closeup": 5831
│   │   ├── "knorr": 5406
│   │   ├── "kissan": 5325
│   │   ├── "rexona": 4335
│   │   ├── "smartpick": 4123
│   │   ├── "bru": 1917
│   │   ├── "comfort": 1557
│   │   ├── "magnum": 1329
│   │   ├── "moti": 995
│   │   └── "axe": 152
│   └── "org_views": 584380
├── "mamaearth_org": 
│   ├── "brand_views": 
│   │   