In [1]:
! pip install pyspark



In [None]:
from pyspark.sql import SparkSession

packages = [
    "org.apache.hadoop:hadoop-aws:3.2.0",
    "org.apache.spark:spark-avro_2.12:2.4.4",
    "org.mongodb.spark:mongo-spark-connector_2.12:3.0.1",
    "org.postgresql:postgresql:42.2.18"

]

spark = SparkSession.builder.appName("extract data from event_store")\
    .master('spark://172.1.0.2:7077')\
    .config("spark.jars.packages", ",".join(packages))\
    .getOrCreate()

spark

In [None]:
df_event_store = spark.read.format('mongo').option('spark.mongodb.input.uri','mongodb://172.1.0.10:27017/event_store.event_store').option("encoding","UTF-8").load()

In [None]:
df_event_store.printSchema()

In [None]:
import pyspark.sql.functions as F 
df_event_store= df_event_store.withColumn("wedding_minus_amend",(df_event_store["weddingDate"]-df_event_store["amendDate"]) )
df_event_store= df_event_store.withColumn("wedding_minus_created",(df_event_store["weddingDate"]-df_event_store["weddingCreatedAtDate"]))
df_event_store= df_event_store.withColumn("amend_minus_created",(df_event_store["amendDate"]-df_event_store["weddingCreatedAtDate"]))
df_event_store= df_event_store.withColumn("score_per_record",F.when(df_event_store.eventType == "ADD",F.col("wedding_minus_amend")/F.col("wedding_minus_created")).when(df_event_store.eventType == "DELETE",-(F.col("amend_minus_created")/F.col("wedding_minus_created"))))

# Calculate avg score of each record
avg_score_df= df_event_store.groupBy("data.budget_description_id").mean("score_per_record").withColumnRenamed("avg(score_per_record)","avg_score")



In [None]:
num_of_unique_wedding_id = df_event_store.select("data.wedding_event_id").distinct().count()
df_event_store= df_event_store.withColumn("exp_per_record",F.when(df_event_store.eventType == "ADD",F.col("data.expenditure")).when(df_event_store.eventType == "DELETE",-F.col("data.expenditure")))
df_exp = df_event_store.groupBy("data.budget_description_id").sum("exp_per_record").withColumnRenamed("sum(exp_per_record)","total_exp")
# Calculate avg expenditure of each description
df_exp=df_exp.withColumn("avg_exp",F.col("total_exp")/num_of_unique_wedding_id)

In [None]:
final_df=avg_score_df.join(df_exp,["budget_description_id"])

In [None]:
tmp_df=df_event_store.groupBy("data.description","data.budget_cat_id","data.budget_description_id").count()

In [None]:
final_df=final_df.join(tmp_df,["budget_description_id"])
final_df

In [None]:
final_df = final_df.select("budget_description_id","avg_score","avg_exp","description","budget_cat_id")
final_df.show()

In [None]:
RDS_DB="wedding"
RDS_USERNAME="postgres"
RDS_PASSWORD="postgres"
RDS_HOST="final-project-data-warehouse.cit8sojr7959.ap-southeast-1.rds.amazonaws.com"
final_df.write.format('jdbc')\
    .option('url', "jdbc:postgresql://{}/{}".format(RDS_HOST, RDS_DB))\
    .option('dbtable', 'event_store_budget_planning')\
    .option('user', RDS_USERNAME)\
    .option('password', RDS_PASSWORD)\
    .option('driver', 'org.postgresql.Driver')\
    .mode('append')\
    .save()