In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, lit, coalesce
from pyspark.sql.types import (
    StructType,
    StructField,
    IntegerType,
    DoubleType,
    StringType,
    TimestampType,
)
from pyspark.sql.window import Window
import pyspark.sql.functions as F

spark = (
    SparkSession.builder.appName("Pyspark playground")
    .config("spark.executor.memory", "4g")
    .config("spark.driver.memory", "4g")
    .getOrCreate()
)

In [None]:
df = spark.read.json("data/items.jl")

In [None]:
# drop unused columns
drop_cols = "address direction distance renovate tel code".split()
df = df.drop(*drop_cols)

In [None]:
# order columns
df = df.select(
    "id name lat lng products province district subdistrict postcode".split()
)

In [None]:
# drop ID duplicates (from scraping, since grid boundary might overlapped a bit)
df = df.dropDuplicates(["id"])

In [None]:
# # some record regions are NULL, but there's only 7 records so we SKIP it for now
# df.where(col('province').isNull()).show()
df = df.where(col("province").isNotNull())

In [None]:
df.show()

# Crunch

## Groupby region

In [None]:
groupby_region = df.groupBy("province district subdistrict".split()).agg(
    F.count("*").alias("count")
)
groupby_region.show()

In [None]:
# groupby_region.toPandas().to_csv('data/output/groupby_region.csv', index=False)

## Groupby product type

In [None]:
df = (
    df.withColumn("products", F.explode("products"))
    .select(
        *df.columns,
        col("products.code").alias("product_code"),
        col("products.name").alias("product_name")
    )
    .drop("products")
    .cache()
)

In [None]:
groupby_region_product = df.groupBy(
    "province district subdistrict product_name".split()
).agg(F.count("*").alias("count"))
groupby_region_product.show()

In [None]:
groupby_region_product.toPandas().to_csv(
    "data/output/groupby_region_product.csv", index=False
)

# Count product groups per branch

In [None]:
groupby_region_id_product = df.groupBy(
    "id province district subdistrict lat lng".split()
).agg(F.count("product_name").alias("count"))
groupby_region_id_product.show()

In [None]:
groupby_region_id_product.toPandas().to_csv(
    "data/output/groupby_region_id_product.csv", index=False
)