# Scala Version

In [2]:
%scala 
val cable_df = spark.read.format("csv")
  .option("header", "true")
  .option("inferSchema", "true")
  .load("/FileStore/tables/cable-3.csv")

display(cable_df)

viewing_timestamp,household_id,ad_id
2020-08-12 20:01:20,1,1001
2020-08-17 22:22:15,1,1001
2020-08-21 05:33:33,1,1002
2020-08-22 22:52:28,2,1001
2020-08-30 17:02:06,2,1002


In [3]:
%scala
cable_df.printSchema()

In [4]:
%scala 
val ott_df = spark.read.format("csv")
  .option("header", "true")
  .option("inferSchema", "true")
  .load("/FileStore/tables/ott-3.csv")

display(ott_df)

viewing_timestamp,household_id,ad_id
2020-08-22 22:23:12,1,1002
2020-08-22 13:34:32,2,1001
2020-08-28 05:42:18,2,1002
2020-08-29 21:12:16,3,1001


In [5]:
%scala 
val ads_df = spark.read.format("csv")
  .option("header", "true")
  .option("inferSchema", "true")
  .load("/FileStore/tables/ads-3.csv")

display(ads_df)

ad_id,advertiser
1001,Nike
1002,Nike
1003,Coke


In [6]:
%scala

import org.apache.spark.sql.functions.{lit, from_unixtime, unix_timestamp}

val _all = cable_df.withColumn("service", lit("cable")).union(ott_df.withColumn("service", lit("ott"))).union(cable_df.withColumn("service", lit("both"))).union(ott_df.withColumn("service", lit("both"))).withColumn("viewing_month", from_unixtime(unix_timestamp($"viewing_timestamp", "yyyy-MM-dd HH:mm:ss"), "yyyy-MM")).withColumn("hour", from_unixtime(unix_timestamp($"viewing_timestamp", "yyyy-MM-dd HH:mm:ss"), "HH")).filter(($"hour">=20) && ($"hour"<=22))

val result_df = _all.join(ads_df, _all("ad_id") === ads_df("ad_id")).groupBy("advertiser", "viewing_month", "service").count().as("ads")

// val result_df = _all.join(ads_df, _all("ad_id") === ads_df("ad_id")).repartition(numPartitions = 2, $"advertiser", $"viewing_month", $"service").groupBy("advertiser", "viewing_month", "service").count().as("ads")

result_df.show()

In [7]:
%scala
// default partition 200
result_df.explain()

In [8]:
%scala
// set repartition to 2
result_df.explain()

In [9]:
%scala
val permanent_table_name = "result_scala"
result_df.write.format("parquet").saveAsTable(permanent_table_name)

In [10]:
%scala
val read_parquet_df = spark.read.parquet("/user/hive/warehouse/result_scala")
read_parquet_df.show()

# PySpark Version

In [12]:
# File location and type
file_location = "/FileStore/tables/cable-3.csv"
file_type = "csv"

# CSV options
infer_schema = "false"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
cable_df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

display(cable_df)

viewing_timestamp,household_id,ad_id
2020-08-12 20:01:20,1,1001
2020-08-17 22:22:15,1,1001
2020-08-21 05:33:33,1,1002
2020-08-22 22:52:28,2,1001
2020-08-30 17:02:06,2,1002


In [13]:
cable_df.printSchema()

In [14]:
# File location and type
file_location = "/FileStore/tables/ott-3.csv"
file_type = "csv"

# CSV options
infer_schema = "false"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
ott_df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

display(ott_df)

viewing_timestamp,household_id,ad_id
2020-08-22 22:23:12,1,1002
2020-08-22 13:34:32,2,1001
2020-08-28 05:42:18,2,1002
2020-08-29 21:12:16,3,1001


In [15]:
# File location and type
file_location = "/FileStore/tables/ads-3.csv"
file_type = "csv"

# CSV options
infer_schema = "false"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
ads_df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

display(ads_df)

ad_id,advertiser
1001,Nike
1002,Nike
1003,Coke


In [16]:
from pyspark.sql.functions import year, month, dayofmonth, hour, concat, lit

_all = cable_df.withColumn('service', lit('cable'))\
.union(ott_df.withColumn('service', lit('ott')))\
.union(cable_df.withColumn('service', lit('both')))\
.union(ott_df.withColumn('service', lit('both'))).withColumn('viewing_month', concat(year('viewing_timestamp'), lit("-"), month('viewing_timestamp'))).withColumn('hour', hour('viewing_timestamp'))

result_df = _all.join(ads_df, _all.ad_id == ads_df.ad_id).filter((_all.hour>=20) & (_all.hour<=22)).repartition(2, ["advertiser", "viewing_month", "service"]).groupby(["advertiser", "viewing_month", "service"]).count().alias('ads')
#result_df = _all.join(ads_df, _all.ad_id == ads_df.ad_id).filter((_all.hour>=20) & (_all.hour<=22)).groupby(["advertiser", "viewing_month", "service"]).count().alias('ads')

result_df.show()


In [17]:
# default
result_df.rdd.getNumPartitions()

In [18]:
# repartition
result_df.rdd.getNumPartitions()

In [19]:
result_df.explain()

In [20]:
permanent_table_name = "result_df_pyspark"
result_df.write.format("parquet").saveAsTable(permanent_table_name)

In [21]:
read_parquet_df = sqlContext.read.parquet('/user/hive/warehouse/result_df_pyspark')
read_parquet_df.show()

# SQL Version

In [23]:
# Create a view or table

temp_table_name = "ott_csv"

ott_df.createOrReplaceTempView(temp_table_name)

In [24]:
%sql

/* Query the created temp table in a SQL cell */

select * from `ott_csv`

viewing_timestamp,household_id,ad_id
2020-08-22 22:23:12,1,1002
2020-08-22 13:34:32,2,1001
2020-08-28 05:42:18,2,1002
2020-08-29 21:12:16,3,1001


In [25]:
# Create a view or table

temp_table_name = "cable_csv"

cable_df.createOrReplaceTempView(temp_table_name)

In [26]:
%sql

/* Query the created temp table in a SQL cell */

select * from `cable_csv`

viewing_timestamp,household_id,ad_id
2020-08-12 20:01:20,1,1001
2020-08-17 22:22:15,1,1001
2020-08-21 05:33:33,1,1002
2020-08-22 22:52:28,2,1001
2020-08-30 17:02:06,2,1002


In [27]:
# Create a view or table

temp_table_name = "ads_csv"

ads_df.createOrReplaceTempView(temp_table_name)

In [28]:
%sql

/* Query the created temp table in a SQL cell */

select * from `ads_csv`

ad_id,advertiser
1001,Nike
1002,Nike
1003,Coke


In [29]:
%sql
select *, substring(viewing_timestamp, 12 , 2) as hour from `cable_csv` where int(substring(viewing_timestamp, 12 , 2)) >= 20 and int(substring(viewing_timestamp, 12 , 2)) <= 22

viewing_timestamp,household_id,ad_id,hour
2020-08-12 20:01:20,1,1001,20
2020-08-17 22:22:15,1,1001,22
2020-08-22 22:52:28,2,1001,22


In [30]:
sqlContext.sql("set spark.sql.shuffle.partitions=2")

In [31]:
%sql

/* Query the created temp table in a SQL cell */

select advertiser, substring(viewing_timestamp, 0 , 7) as viewing_month, service, count(*) as ads
from 
(
  select *, int(substring(viewing_timestamp, 12 , 2)) as hour from
  (
    select *, 'cable' as service from `cable_csv` union
    select *, 'ott' as service from `ott_csv` union
    select *, 'both' as service from `cable_csv` union
    select *, 'both' as service from `ott_csv`
   ) as _all join `ads_csv` as ad on _all.ad_id = ad.ad_id 
  where int(substring(viewing_timestamp, 12 , 2)) >= 20 and int(substring(viewing_timestamp, 12 , 2)) <= 22 
)
group by advertiser, viewing_month, service


advertiser,viewing_month,service,ads
Nike,2020-08,cable,3
Nike,2020-08,ott,2
Nike,2020-08,both,5


In [32]:
result_df = sqlContext.sql(\
                           "select advertiser, substring(viewing_timestamp, 0 , 7) as viewing_month, service, count(*) as ads\
                            from \
                            (\
                              select *, int(substring(viewing_timestamp, 12 , 2)) as hour from\
                              (\
                                select *, 'cable' as service from `cable_csv` union\
                                select *, 'ott' as service from `ott_csv` union\
                                select *, 'both' as service from `cable_csv` union\
                                select *, 'both' as service from `ott_csv`\
                               ) as _all join `ads_csv` as ad on _all.ad_id = ad.ad_id \
                              where int(substring(viewing_timestamp, 12 , 2)) >= 20 and int(substring(viewing_timestamp, 12 , 2)) <= 22 \
                            )\
                            group by advertiser, viewing_month, service"\
                          )
result_df.show()

In [33]:
# With this registered as a temp view, it will only be available to this particular notebook. If you'd like other users to be able to query this table, you can also create a table from the DataFrame.
# Once saved, this table will persist across cluster restarts as well as allow various users across different notebooks to query this data.
# To do so, choose your table name and uncomment the bottom line.

permanent_table_name = "result_sql"
result_df.write.format("parquet").saveAsTable(permanent_table_name)