In [1]:
from pyspark.sql.functions import *

In [2]:
hdfs_path="/tmp/input_data/"

In [17]:
ad_campaigns_df=spark.read.format("json")\
                .option("multiline", "true")\
                .load(hdfs_path+"ad_campaigns_data.json")

In [18]:
ad_campaigns_df.show()

+----------------+-----------+--------------------+-----------+--------------------+----------+-------+---------+-------------------+
|campaign_country|campaign_id|       campaign_name|device_type|          event_time|event_type|os_type| place_id|            user_id|
+----------------+-----------+--------------------+-----------+--------------------+----------+-------+---------+-------------------+
|             USA|    ABCDFAE|Food category tar...|      apple|2018-10-12T13:10:...|impression|    ios|CASSBB-11|1264374214654454321|
|             USA|    ABCDFAE|Food category tar...|   MOTOROLA|2018-10-12T13:09:...|impression|android|CADGBD-13|1674374214654454321|
|             USA|    ABCDFAE|Food category tar...|    SAMSUNG|2018-10-12T13:10:...|  video ad|android|BADGBA-12|   5747421465445443|
|             USA|    ABCDFAE|Food category tar...|    SAMSUNG|2018-10-12T13:10:...|     click|android|CASSBB-11|1864374214654454132|
+----------------+-----------+--------------------+-----------

In [20]:
ad_campaigns_df.select("event_time").show()

+--------------------+
|          event_time|
+--------------------+
|2018-10-12T13:10:...|
|2018-10-12T13:09:...|
|2018-10-12T13:10:...|
|2018-10-12T13:10:...|
+--------------------+



In [7]:
user_profile_df=spark.read.format("json")\
                .option("multiline", "true")\
                .load(hdfs_path+"user_profile_data.json")
user_profile_df.show()

+---------+--------------------+-------+------+-------------------+
|age_group|            category|country|gender|            user_id|
+---------+--------------------+-------+------+-------------------+
|    18-25|  [shopper, student]|    USA|  male|1264374214654454321|
|    25-50|            [parent]|    USA|female|1674374214654454321|
|    25-50|[shopper, parent,...|    USA|  male|   5747421465445443|
|      50+|      [professional]|    USA|  male|1864374214654454132|
|    18-25|  [shopper, student]|    USA|female|  14537421465445443|
|      50+|[shopper, profess...|    USA|female|  25547421465445443|
+---------+--------------------+-------+------+-------------------+



In [8]:
store_df=spark.read.format("json")\
                .option("multiline", "true")\
                .load(hdfs_path+"store_data.json")
store_df.show()

+--------------------+-------------+
|           place_ids|   store_name|
+--------------------+-------------+
|[CASSBB-11, CADGB...|     McDonald|
|         [CASSBB-11]|   BurgerKing|
|[BADGBA-13, CASSB...|        Macys|
|         [BADGBA-12]|shoppers stop|
+--------------------+-------------+



## Q1. Analyse data for each campaign_id, date, hour, os_type & value to get all the events with counts

In [22]:
ad_campaigns=ad_campaigns_df.groupBy("campaign_id",
                        substring(col("event_time"), 0, 10).alias("date"),
                        substring(col("event_time"),12, 2).alias("hour"),
                        col("os_type"),
                        col("event_type")
                       ).agg(count("event_type").alias("events"))\
                        .selectExpr(
                          "campaign_id",
                          "date",
                          "hour",
                          "'os_type' as type",
                          "os_type as value",
                          "struct(event_type, events) as event"
                          ) \
                          .groupBy("campaign_id", "date", "hour", "type", "value") \
                          .agg(collect_list("event").alias("events")) \
                          .selectExpr(
                              "campaign_id",
                              "date",
                              "hour",
                              "type",
                              "value",
                              "map_from_entries(events) as event"
                          )
ad_campaigns.show()

[Stage 14:>                                                         (0 + 1) / 1]

+-----------+----------+----+-------+-------+--------------------+
|campaign_id|      date|hour|   type|  value|               event|
+-----------+----------+----+-------+-------+--------------------+
|    ABCDFAE|2018-10-12|  13|os_type|android|{click -> 1, impr...|
|    ABCDFAE|2018-10-12|  13|os_type|    ios|   {impression -> 1}|
+-----------+----------+----+-------+-------+--------------------+



                                                                                

In [30]:
ad_campaigns_df.select(substring(col("event_time"), 0, 10).alias('date')).show()

+----------+
|      date|
+----------+
|2018-10-12|
|2018-10-12|
|2018-10-12|
|2018-10-12|
+----------+



In [32]:
ad_campaigns_df.select(substring(col("event_time"), 12,2).alias('hour')).show()

+----+
|hour|
+----+
|  13|
|  13|
|  13|
|  13|
+----+



## Q1-1 Store processed json data of each problem statement into different HDFS output directory

In [35]:
ad_campaigns.coalesce(1).write.format('json').save('/tmp/output_data/ad_campaign/')
print("Write Successfull")

Write Successfull


## Q2.Analyse data for each campaign_id, date, hour, store_name & value to get all the events with counts

In [36]:
stores=ad_campaigns_df.join(store_df, array_contains(store_df.place_ids, ad_campaigns_df.place_id),"left")\
                    .groupBy("campaign_id",
                             substring("event_time", 0, 10).alias('date'),
                             substring("event_time", 12, 2).alias('hour'),
                             "store_name",
                             "event_type"
                             ).agg(count("event_type").alias('events'))\
                    .selectExpr("campaign_id",
                            "date",
                            "hour",
                            "'store_name' as type",
                            "store_name as value",
                            "struct(event_type, events) as event_dict")\
                    .groupBy("campaign_id",
                            "date",
                            "hour",
                            "type",
                            "value"
                            ).agg(collect_list("event_dict").alias('event'))\
                    .select("campaign_id",
                            "date",
                            "hour",
                            "type",
                            "value",
                            map_from_entries("event").alias('event'))

In [37]:
stores.show()

+-----------+----------+----+----------+-------------+--------------------+
|campaign_id|      date|hour|      type|        value|               event|
+-----------+----------+----+----------+-------------+--------------------+
|    ABCDFAE|2018-10-12|  13|store_name|   BurgerKing|{impression -> 1,...|
|    ABCDFAE|2018-10-12|  13|store_name|     McDonald|{click -> 1, impr...|
|    ABCDFAE|2018-10-12|  13|store_name|shoppers stop|     {video ad -> 1}|
+-----------+----------+----+----------+-------------+--------------------+



## Q2-1 write the data into HDFS output directory





In [40]:
stores.coalesce(1).write.format('json').save('/tmp/output_data/store/')
print("Write successful")

Write successful


## Q3.Analyse data for each campaign_id, date, hour, gender_type & value to get all the events with counts

In [41]:
user_profile=ad_campaigns_df.join(user_profile_df, ad_campaigns_df.user_id == user_profile_df.user_id, "left")\
                            .select("campaign_id",
                                    substring("event_time", 0, 10).alias("date"),
                                    substring("event_time", 12, 2).alias("hour"),
                                    lit('gender').alias("type"),
                                    col("gender").alias("value"),
                                    "event_type")\
                            .groupBy("campaign_id", "date", "hour", "type", "value", "event_type")\
                            .agg(count("event_type").alias("event_count"))\
                            .select("campaign_id", "date", "hour", "type", "value", struct("event_type", "event_count").alias("events_map"))\
                            .groupBy("campaign_id", "date", "hour", "type", "value")\
                            .agg(collect_list("events_map").alias("map_list"))\
                            .select("campaign_id", "date", "hour", "type", "value", map_from_entries("map_list").alias("event"))
                                   
user_profile.show()

+-----------+----------+----+------+------+--------------------+
|campaign_id|      date|hour|  type| value|               event|
+-----------+----------+----+------+------+--------------------+
|    ABCDFAE|2018-10-12|  13|gender|  male|{impression -> 1,...|
|    ABCDFAE|2018-10-12|  13|gender|female|   {impression -> 1}|
+-----------+----------+----+------+------+--------------------+



## Q3-1 write the data into HDFS output directory

In [42]:
user_profile.coalesce(1).write.format('json').save('/tmp/output_data/user_profile')
print("Write successfull")

Write successfull
