In [None]:
!wget https://jdbc.postgresql.org/download/postgresql-42.5.1.jar --no-check-certificate

In [77]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.config("spark.driver.extraClassPath", "postgresql-42.5.1.jar").config("spark.jars", "postgresql-42.5.1.jar").master("local").getOrCreate()
sc = spark.sparkContext

In [78]:
from pyspark.sql.types import *
schema = StructType([StructField('event_time', TimestampType(), True),
                     StructField('event_type', StringType(), True),
                     StructField('product_id', IntegerType(), True),
                     StructField('category_id', IntegerType(), True),
                     StructField('category_code', StringType(), True),
                     StructField('brand', StringType(), True),
                     StructField('price', DoubleType(), True),
                     StructField('user_id', IntegerType(), True),
                     StructField('user_session', StringType(), True),])

In [79]:
df = spark.read.csv(r"2019-Oct-Sample.csv", schema=schema)

In [83]:
df = df.withColumn('department', regexp_extract('category_code', r'([A-Za-z]*)', 0))
df = df.withColumn('category', regexp_extract('category_code', r'([A-Za-z]*)', 1))

In [None]:
countif = lambda condition: sum(when(condition, 1).otherwise(0))
df_brands = df.groupBy('brand')\
              .agg(mean('price').alias('avg_price'),
                   countDistinct('user_session').alias('total_user_sessions'),
                   countDistinct('user_id').alias('total_user_id'),
                   countif(df.event_type == 'view').alias('total_views'),
                   countif(df.event_type == 'purchase').alias('total_orders'),
                   countif(df.event_type == 'cart').alias('total_carted')
                  )\
              .show()

In [None]:
df.write \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://postgres:5432/postgres") \
    .option("dbtable", "user_events") \
    .option("user", "postgres") \
    .option("password", "1221") \
    .option("driver", "org.postgresql.Driver") \
    .save()

In [None]:
df_brands.write \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://postgres:5432/postgres") \
    .option("dbtable", "brand_details") \
    .option("user", "postgres") \
    .option("password", "1221") \
    .option("driver", "org.postgresql.Driver") \
    .save()