In [161]:
from pyspark.sql import SparkSession
import getpass
username = getpass.getuser()
spark = SparkSession. \
builder. \
config('spark.ui.port', '0'). \
appName("week13").\
config('spark.shuffle.useOldFetchProtocol', 'true'). \
config("spark.sql.warehouse.dir", f"/user/itv009033/warehouse"). \
enableHiveSupport(). \
master('yarn'). \
getOrCreate()

In [162]:
df = spark.read.format("csv").load("landing/cust_trnsf/cust_transf.csv")
df.show()

+----+----------+----+-----+
| _c0|       _c1| _c2|  _c3|
+----+----------+----+-----+
|1001|2023-05-15|1001|49.99|
|1002|2023-05-16|1002|29.99|
|1003|2023-05-17|1003|39.99|
|1004|2023-05-18|1004|19.99|
|1005|2023-05-19|1005|24.99|
|1001|2023-05-20|1002|29.99|
|1002|2023-05-21|1003|39.99|
|1003|2023-05-22|1004|19.99|
|1004|2023-05-23|1005|24.99|
|1005|2023-05-24|1001|49.99|
|1001|2023-05-25|1003|39.99|
|1002|2023-05-26|1004|19.99|
|1003|2023-05-27|1005|24.99|
|1004|2023-05-28|1001|49.99|
|1005|2023-05-29|1002|29.99|
|1001|2023-05-30|1003|39.99|
|1002|2023-05-31|1004|19.99|
|1003|2023-06-01|1005|24.99|
|1004|2023-06-02|1001|49.99|
|1005|2023-06-03|1002|29.99|
+----+----------+----+-----+
only showing top 20 rows



In [163]:
df.rdd.getNumPartitions()

18

# 1. Analyze the datasets chosen and come up with an example use-case.

In [164]:
from pyspark.sql.types import StructType, StructField, TimestampType, IntegerType, StringType, DateType, FloatType
from pyspark.sql.functions import from_unixtime, expr

In [165]:
customers_schema = StructType([StructField("o_id", IntegerType(), True)
                              ,StructField("date", DateType(), True)
                              ,StructField("c_id", IntegerType(), True)
                              ,StructField("amount", FloatType(), True)])

In [166]:
cust_transf_df = spark.read.format("csv").schema(customers_schema).load("landing/cust_trnsf/cust_transf.csv")
cust_transf_df.show()

+----+----------+----+------+
|o_id|      date|c_id|amount|
+----+----------+----+------+
|1001|2023-05-15|1001| 49.99|
|1002|2023-05-16|1002| 29.99|
|1003|2023-05-17|1003| 39.99|
|1004|2023-05-18|1004| 19.99|
|1005|2023-05-19|1005| 24.99|
|1001|2023-05-20|1002| 29.99|
|1002|2023-05-21|1003| 39.99|
|1003|2023-05-22|1004| 19.99|
|1004|2023-05-23|1005| 24.99|
|1005|2023-05-24|1001| 49.99|
|1001|2023-05-25|1003| 39.99|
|1002|2023-05-26|1004| 19.99|
|1003|2023-05-27|1005| 24.99|
|1004|2023-05-28|1001| 49.99|
|1005|2023-05-29|1002| 29.99|
|1001|2023-05-30|1003| 39.99|
|1002|2023-05-31|1004| 19.99|
|1003|2023-06-01|1005| 24.99|
|1004|2023-06-02|1001| 49.99|
|1005|2023-06-03|1002| 29.99|
+----+----------+----+------+
only showing top 20 rows



In [167]:
temp_df = cust_transf_df

In [168]:
cust_transf_df.printSchema()

root
 |-- o_id: integer (nullable = true)
 |-- date: date (nullable = true)
 |-- c_id: integer (nullable = true)
 |-- amount: float (nullable = true)



# Use Case 1 : 

* Let's breakdown the date column and find out the amount collected per month, year and Quarter

In [169]:
import datetime
from pyspark.sql.functions import *
from pyspark.sql.functions import year, month, dayofmonth, quarter

In [170]:
cust_transf_df = cust_transf_df.withColumn("year", year("date").alias("year"))
cust_transf_df = cust_transf_df.withColumn("month", month("date").alias("month"))
cust_transf_df = cust_transf_df.withColumn("quarter", quarter("date").alias("quarter"))
cust_transf_df = cust_transf_df.withColumn("day", dayofmonth("date").alias("day"))
cust_transf_df.show()

+----+----------+----+------+----+-----+-------+---+
|o_id|      date|c_id|amount|year|month|quarter|day|
+----+----------+----+------+----+-----+-------+---+
|1001|2023-05-15|1001| 49.99|2023|    5|      2| 15|
|1002|2023-05-16|1002| 29.99|2023|    5|      2| 16|
|1003|2023-05-17|1003| 39.99|2023|    5|      2| 17|
|1004|2023-05-18|1004| 19.99|2023|    5|      2| 18|
|1005|2023-05-19|1005| 24.99|2023|    5|      2| 19|
|1001|2023-05-20|1002| 29.99|2023|    5|      2| 20|
|1002|2023-05-21|1003| 39.99|2023|    5|      2| 21|
|1003|2023-05-22|1004| 19.99|2023|    5|      2| 22|
|1004|2023-05-23|1005| 24.99|2023|    5|      2| 23|
|1005|2023-05-24|1001| 49.99|2023|    5|      2| 24|
|1001|2023-05-25|1003| 39.99|2023|    5|      2| 25|
|1002|2023-05-26|1004| 19.99|2023|    5|      2| 26|
|1003|2023-05-27|1005| 24.99|2023|    5|      2| 27|
|1004|2023-05-28|1001| 49.99|2023|    5|      2| 28|
|1005|2023-05-29|1002| 29.99|2023|    5|      2| 29|
|1001|2023-05-30|1003| 39.99|2023|    5|      

In [171]:
transformation_1 = cust_transf_df.groupBy("month").agg(sum("amount").alias("total_amount"))
transformation_1.show()

+-----+-------------------+
|month|       total_amount|
+-----+-------------------+
|    6|2.003853912477951E9|
|    5|8.827241263519745E8|
+-----+-------------------+



In [172]:
transformation_1.count()

2

In [173]:
transformation_1.show(35)

+-----+-------------------+
|month|       total_amount|
+-----+-------------------+
|    6|2.003853912477951E9|
|    5|8.827241263519745E8|
+-----+-------------------+



In [174]:
customers =  cust_transf_df.select("o_id", "c_id", "date","month", "day", "amount")
customers.show()

+----+----+----------+-----+---+------+
|o_id|c_id|      date|month|day|amount|
+----+----+----------+-----+---+------+
|1001|1001|2023-05-15|    5| 15| 49.99|
|1002|1002|2023-05-16|    5| 16| 29.99|
|1003|1003|2023-05-17|    5| 17| 39.99|
|1004|1004|2023-05-18|    5| 18| 19.99|
|1005|1005|2023-05-19|    5| 19| 24.99|
|1001|1002|2023-05-20|    5| 20| 29.99|
|1002|1003|2023-05-21|    5| 21| 39.99|
|1003|1004|2023-05-22|    5| 22| 19.99|
|1004|1005|2023-05-23|    5| 23| 24.99|
|1005|1001|2023-05-24|    5| 24| 49.99|
|1001|1003|2023-05-25|    5| 25| 39.99|
|1002|1004|2023-05-26|    5| 26| 19.99|
|1003|1005|2023-05-27|    5| 27| 24.99|
|1004|1001|2023-05-28|    5| 28| 49.99|
|1005|1002|2023-05-29|    5| 29| 29.99|
|1001|1003|2023-05-30|    5| 30| 39.99|
|1002|1004|2023-05-31|    5| 31| 19.99|
|1003|1005|2023-06-01|    6|  1| 24.99|
|1004|1001|2023-06-02|    6|  2| 49.99|
|1005|1002|2023-06-03|    6|  3| 29.99|
+----+----+----------+-----+---+------+
only showing top 20 rows



In [175]:
customers = customers.cache()

In [176]:
customers.count()

87498290

In [177]:
customers.rdd.getNumPartitions()

18

In [178]:
temp_df.createOrReplaceTempView("customers")

In [179]:
spark.sql('''select * from customers''').show()

+----+----------+----+------+
|o_id|      date|c_id|amount|
+----+----------+----+------+
|1001|2023-05-15|1001| 49.99|
|1002|2023-05-16|1002| 29.99|
|1003|2023-05-17|1003| 39.99|
|1004|2023-05-18|1004| 19.99|
|1005|2023-05-19|1005| 24.99|
|1001|2023-05-20|1002| 29.99|
|1002|2023-05-21|1003| 39.99|
|1003|2023-05-22|1004| 19.99|
|1004|2023-05-23|1005| 24.99|
|1005|2023-05-24|1001| 49.99|
|1001|2023-05-25|1003| 39.99|
|1002|2023-05-26|1004| 19.99|
|1003|2023-05-27|1005| 24.99|
|1004|2023-05-28|1001| 49.99|
|1005|2023-05-29|1002| 29.99|
|1001|2023-05-30|1003| 39.99|
|1002|2023-05-31|1004| 19.99|
|1003|2023-06-01|1005| 24.99|
|1004|2023-06-02|1001| 49.99|
|1005|2023-06-03|1002| 29.99|
+----+----------+----+------+
only showing top 20 rows



In [180]:
spark.sql("""select o_id, first(date_format(date, 'MM')) as mth, sum(amount) as total_amount from customers group by o_id order by mth desc""").show()

+----+---+--------------------+
|o_id|mth|        total_amount|
+----+---+--------------------+
|1008| 06|1.3517889965189552E8|
|1005| 06| 2.624090592137146E8|
|1010| 06|1.6699778965229416E8|
|1015| 06|1.6700301465229416E8|
|1014| 06|1.5109356965229416E8|
|1007| 06|1.4313205465189552E8|
|1011| 06|1.9086143768658066E8|
|1012| 06|1.4313727965229416E8|
|1006| 06|1.9085621268618202E8|
|1009| 06|1.5108834465189552E8|
|1013| 06|1.3518412465229416E8|
|1002| 05| 2.067296592137146E8|
|1001| 05| 3.180884683165741E8|
|1003| 05| 2.146838592137146E8|
|1004| 05| 3.101342652822876E8|
+----+---+--------------------+



In [181]:
spark.sql("""select o_id, first(int(date_format(date, 'MM'))) as mth, sum(amount) as total_amount from customers group by o_id order by mth desc""").show()

+----+---+--------------------+
|o_id|mth|        total_amount|
+----+---+--------------------+
|1006|  6|1.9085621268618202E8|
|1015|  6|1.6700301465229416E8|
|1007|  6|1.4313205465189552E8|
|1010|  6|1.6699778965229416E8|
|1014|  6|1.5109356965229416E8|
|1011|  6|1.9086143768658066E8|
|1012|  6|1.4313727965229416E8|
|1009|  6|1.5108834465189552E8|
|1013|  6|1.3518412465229416E8|
|1008|  6|1.3517889965189552E8|
|1005|  5| 2.624090592137146E8|
|1002|  5| 2.067296592137146E8|
|1003|  5| 2.146838592137146E8|
|1001|  5| 3.180884683165741E8|
|1004|  5| 3.101342652822876E8|
+----+---+--------------------+



In [182]:
spark.sql('''select * from customers''').show()

+----+----------+----+------+
|o_id|      date|c_id|amount|
+----+----------+----+------+
|1001|2023-05-15|1001| 49.99|
|1002|2023-05-16|1002| 29.99|
|1003|2023-05-17|1003| 39.99|
|1004|2023-05-18|1004| 19.99|
|1005|2023-05-19|1005| 24.99|
|1001|2023-05-20|1002| 29.99|
|1002|2023-05-21|1003| 39.99|
|1003|2023-05-22|1004| 19.99|
|1004|2023-05-23|1005| 24.99|
|1005|2023-05-24|1001| 49.99|
|1001|2023-05-25|1003| 39.99|
|1002|2023-05-26|1004| 19.99|
|1003|2023-05-27|1005| 24.99|
|1004|2023-05-28|1001| 49.99|
|1005|2023-05-29|1002| 29.99|
|1001|2023-05-30|1003| 39.99|
|1002|2023-05-31|1004| 19.99|
|1003|2023-06-01|1005| 24.99|
|1004|2023-06-02|1001| 49.99|
|1005|2023-06-03|1002| 29.99|
+----+----------+----+------+
only showing top 20 rows



In [183]:
spark.sql("""select o_id, c_id from (select o_id, c_id from customers where o_id <1010 ) where o_id <1007 order by o_id asc """).show()

+----+----+
|o_id|c_id|
+----+----+
|1001|1001|
|1001|1001|
|1001|1003|
|1001|1003|
|1001|1003|
|1001|1001|
|1001|1002|
|1001|1003|
|1001|1003|
|1001|1003|
|1001|1001|
|1001|1002|
|1001|1002|
|1001|1003|
|1001|1003|
|1001|1003|
|1001|1001|
|1001|1002|
|1001|1003|
|1001|1003|
+----+----+
only showing top 20 rows



In [130]:
spark.sql("""select o_id, c_id from (select o_id, c_id from customers where o_id <1010 ) where o_id <1007 order by o_id asc """).explain(True)

== Parsed Logical Plan ==
'Sort ['o_id ASC NULLS FIRST], true
+- 'Project ['o_id, 'c_id]
   +- 'Filter ('o_id < 1007)
      +- 'SubqueryAlias __auto_generated_subquery_name
         +- 'Project ['o_id, 'c_id]
            +- 'Filter ('o_id < 1010)
               +- 'UnresolvedRelation [customers], [], false

== Analyzed Logical Plan ==
o_id: int, c_id: int
Sort [o_id#2385 ASC NULLS FIRST], true
+- Project [o_id#2385, c_id#2387]
   +- Filter (o_id#2385 < 1007)
      +- SubqueryAlias __auto_generated_subquery_name
         +- Project [o_id#2385, c_id#2387]
            +- Filter (o_id#2385 < 1010)
               +- SubqueryAlias customers
                  +- Relation[o_id#2385,date#2386,c_id#2387,amount#2388] csv

== Optimized Logical Plan ==
Sort [o_id#2385 ASC NULLS FIRST], true
+- Project [o_id#2385, c_id#2387]
   +- Filter ((isnotnull(o_id#2385) AND (o_id#2385 < 1010)) AND (o_id#2385 < 1007))
      +- Relation[o_id#2385,date#2386,c_id#2387,amount#2388] csv

== Physical Plan ==
*(2) So

In [184]:
temp_df.show()

+----+----------+----+------+
|o_id|      date|c_id|amount|
+----+----------+----+------+
|1001|2023-05-15|1001| 49.99|
|1002|2023-05-16|1002| 29.99|
|1003|2023-05-17|1003| 39.99|
|1004|2023-05-18|1004| 19.99|
|1005|2023-05-19|1005| 24.99|
|1001|2023-05-20|1002| 29.99|
|1002|2023-05-21|1003| 39.99|
|1003|2023-05-22|1004| 19.99|
|1004|2023-05-23|1005| 24.99|
|1005|2023-05-24|1001| 49.99|
|1001|2023-05-25|1003| 39.99|
|1002|2023-05-26|1004| 19.99|
|1003|2023-05-27|1005| 24.99|
|1004|2023-05-28|1001| 49.99|
|1005|2023-05-29|1002| 29.99|
|1001|2023-05-30|1003| 39.99|
|1002|2023-05-31|1004| 19.99|
|1003|2023-06-01|1005| 24.99|
|1004|2023-06-02|1001| 49.99|
|1005|2023-06-03|1002| 29.99|
+----+----------+----+------+
only showing top 20 rows



In [207]:
schema_evol_schema = StructType([StructField("o_id", IntegerType(), True)
                              ,StructField("date", DateType(), True)
                              ,StructField("c_id", StringType(), True)
                              ,StructField("amount", FloatType(), True)
                              ,StructField("status", StringType(), True)])

In [208]:
schema_evol = spark.read.format("csv").schema(schema_evol_schema).load("landing/cust_trnsf/")

In [209]:
schema_evol.show()

+----+----------+----+------+------+
|o_id|      date|c_id|amount|status|
+----+----------+----+------+------+
|1001|2023-05-15|1001| 49.99|  null|
|1002|2023-05-16|1002| 29.99|  null|
|1003|2023-05-17|1003| 39.99|  null|
|1004|2023-05-18|1004| 19.99|  null|
|1005|2023-05-19|1005| 24.99|  null|
|1001|2023-05-20|1002| 29.99|  null|
|1002|2023-05-21|1003| 39.99|  null|
|1003|2023-05-22|1004| 19.99|  null|
|1004|2023-05-23|1005| 24.99|  null|
|1005|2023-05-24|1001| 49.99|  null|
|1001|2023-05-25|1003| 39.99|  null|
|1002|2023-05-26|1004| 19.99|  null|
|1003|2023-05-27|1005| 24.99|  null|
|1004|2023-05-28|1001| 49.99|  null|
|1005|2023-05-29|1002| 29.99|  null|
|1001|2023-05-30|1003| 39.99|  null|
|1002|2023-05-31|1004| 19.99|  null|
|1003|2023-06-01|1005| 24.99|  null|
|1004|2023-06-02|1001| 49.99|  null|
|1005|2023-06-03|1002| 29.99|  null|
+----+----------+----+------+------+
only showing top 20 rows



In [215]:
schema_evol.write.format("parquet").option("mergeSchema", True).mode("overwrite").option("compression","gzip").option("path","/user/itv009033/landing/cust_trnsf_parquet/").save()

In [211]:
schema_evol_parquet = spark.read.format("parquet").load("landing/cust_trnsf_parquet/")

In [212]:
schema_evol_parquet.show()

+----+----------+----+------+------+
|o_id|      date|c_id|amount|status|
+----+----------+----+------+------+
|1010|2023-06-10|1001| 49.99|  null|
|1006|2023-06-11|1003| 39.99|  null|
|1007|2023-06-12|1004| 19.99|  null|
|1008|2023-06-13|1005| 24.99|  null|
|1009|2023-06-14|1001| 49.99|  null|
|1010|2023-06-15|1002| 29.99|  null|
|1011|2023-06-01|1001| 49.99|  null|
|1012|2023-06-02|1002| 29.99|  null|
|1013|2023-06-03|1003| 39.99|  null|
|1014|2023-06-04|1004| 19.99|  null|
|1015|2023-06-05|1005| 24.99|  null|
|1011|2023-06-06|1002| 29.99|  null|
|1012|2023-06-07|1003| 39.99|  null|
|1013|2023-06-08|1004| 19.99|  null|
|1014|2023-06-09|1005| 24.99|  null|
|1015|2023-06-10|1001| 49.99|  null|
|1011|2023-06-11|1003| 39.99|  null|
|1012|2023-06-12|1004| 19.99|  null|
|1013|2023-06-13|1005| 24.99|  null|
|1014|2023-06-14|1001| 49.99|  null|
+----+----------+----+------+------+
only showing top 20 rows



In [213]:
schema_evol_parquet.tail(50)

[Row(o_id=1011, date=datetime.date(2023, 5, 27), c_id='1011', amount=39.9900016784668, status=None),
 Row(o_id=1012, date=datetime.date(2023, 5, 28), c_id='1012', amount=44.9900016784668, status=None),
 Row(o_id=1013, date=datetime.date(2023, 5, 29), c_id='1013', amount=49.9900016784668, status=None),
 Row(o_id=1014, date=datetime.date(2023, 5, 30), c_id='1014', amount=54.9900016784668, status=None),
 Row(o_id=1015, date=datetime.date(2023, 5, 31), c_id='1015', amount=59.9900016784668, status=None),
 Row(o_id=1006, date=datetime.date(2023, 5, 22), c_id='1006', amount=14.989999771118164, status=None),
 Row(o_id=1007, date=datetime.date(2023, 5, 23), c_id='1007', amount=19.989999771118164, status=None),
 Row(o_id=1008, date=datetime.date(2023, 5, 24), c_id='1008', amount=24.989999771118164, status=None),
 Row(o_id=1009, date=datetime.date(2023, 5, 25), c_id='1009', amount=29.989999771118164, status=None),
 Row(o_id=1010, date=datetime.date(2023, 5, 26), c_id='1010', amount=34.99000167846

In [218]:
schema_evol.write.format("orc").option("mergeSchema", True).mode("overwrite").option("compression","lzo").option("path","/user/itv009033/landing/cust_trnsf_orc/").save()