In [1]:
import findspark
findspark.init()

In [2]:
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [3]:
#Creating a spark session with 3 threads
spark = SparkSession.builder.appName('Misc').master("local[3]").getOrCreate()

In [4]:
#Creating a dataframe 
data_list = [("Ravi","28","1","2022"),("Abdul","23","5","81"),("John","12","12","6"),("Rosy","7","8","63"),("Abdul","23","5","81")]
raw_df = spark.createDataFrame(data_list).toDF("name","day","month","year").repartition(3)
raw_df.show()
raw_df.printSchema()

+-----+---+-----+----+
| name|day|month|year|
+-----+---+-----+----+
|Abdul| 23|    5|  81|
| Ravi| 28|    1|2022|
|Abdul| 23|    5|  81|
| John| 12|   12|   6|
| Rosy|  7|    8|  63|
+-----+---+-----+----+

root
 |-- name: string (nullable = true)
 |-- day: string (nullable = true)
 |-- month: string (nullable = true)
 |-- year: string (nullable = true)



In [5]:
#Adding a monotonically increasing ID
df_1 = raw_df.withColumn("id", monotonically_increasing_id())

In [6]:
#Case statement / Creating a custom column
df_3 = df_1.withColumn("year",expr("""
case when year <=21 then year + 2000
     when year <=100 then year + 1900
     else year
     end
""").cast(IntegerType()))

In [7]:
#Casting using type functions
df_4 = df_1.withColumn("day",col("day").cast(IntegerType())).withColumn("month",col("month").cast(IntegerType())).withColumn("year",df_1.year.cast(IntegerType()))    

In [12]:
#Drop Columns
df_5 = df_3.withColumn("dob",expr("""to_date(concat(day,'/',month,'/',year),'d/m/y')
"""))
df_6 = df_5.drop("day","month","year")

In [13]:
#Drop Duplicates
df_7 = df_6.dropDuplicates(["name"])

In [15]:
#Sort
df_8 = df_7.sort(expr("dob desc"))

In [16]:
#Reading spark dataframe
invoice_df = spark.read.format("csv").option("header","true").option("inferSchema","true").load("invoices.csv")

In [17]:
#Agg operations
invoice_df.select(expr("count(*) as total_count")).show()
invoice_df.select(expr("sum(Quantity) as sum_quantity")).show()
invoice_df.select(expr("avg(UnitPrice) as avg_unit_price")).show()
invoice_df.select(expr("count(distinct invoiceno) as distinct_invoice_number")).show()

+-----------+
|total_count|
+-----------+
|     541909|
+-----------+

+------------+
|sum_quantity|
+------------+
|     5176450|
+------------+

+-----------------+
|   avg_unit_price|
+-----------------+
|4.611113626086849|
+-----------------+

+-----------------------+
|distinct_invoice_number|
+-----------------------+
|                  25900|
+-----------------------+



In [18]:
#Group by
invoice_df.createOrReplaceTempView("sales")
summary_sql = spark.sql("select country,invoiceno, sum(quantity) from sales group by 1,2")
summary_sql.show()

+--------------+---------+-------------+
|       country|invoiceno|sum(quantity)|
+--------------+---------+-------------+
|United Kingdom|   536446|          329|
|United Kingdom|   536508|          216|
|United Kingdom|   537018|           -3|
|United Kingdom|   537401|          -24|
|United Kingdom|   537811|           74|
|United Kingdom|  C537824|           -2|
|United Kingdom|   538895|          370|
|United Kingdom|   540453|          341|
|United Kingdom|   541291|          217|
|United Kingdom|   542551|           -1|
|United Kingdom|   542576|           -1|
|United Kingdom|   542628|            9|
|United Kingdom|   542886|          199|
|United Kingdom|   542907|           75|
|United Kingdom|   543131|          134|
|United Kingdom|   543189|          102|
|United Kingdom|   543265|           -4|
|        Cyprus|   544574|          173|
|United Kingdom|   545077|           24|
|United Kingdom|   545300|          116|
+--------------+---------+-------------+
only showing top

In [19]:
#Spark Write
invoice_df_1= invoice_df.withColumn("invoicedate", to_date(col("invoicedate"),"dd-MM-yyyy H.mm"))
invoice_df_2 = invoice_df_1.withColumn("week_of_year", weekofyear(col("invoicedate")))
invoice_df_2.createOrReplaceTempView("sales")
final_df = spark.sql("""
select country,week_of_year,
        count(distinct invoiceno) as NumInvoices,
        sum(quantity) as TotalQuantity,
        round(sum(quantity*unitprice),2) as InvoiceValue from sales  
        where year(invoicedate) = 2010
group by 1,2""")
final_df.coalesce(1).write.format("parquet").option("path","invoice_file.parquet").mode("overwrite").save()
final_df.show(5)

TypeError: to_date() takes 1 positional argument but 2 were given

In [21]:
#Window Aggregations
running_total_window = Window.partitionBy("Country").orderBy("week_of_year").rowsBetween(Window.unboundedPreceding,Window.currentRow)
final_df = final_df.withColumn("running_total",sum("InvoiceValue").over(running_total_window))
final_df.sort("Country","week_of_year").show(10)

+---------------+------------+-----------+-------------+------------+------------------+
|        country|week_of_year|NumInvoices|TotalQuantity|InvoiceValue|     running_total|
+---------------+------------+-----------+-------------+------------+------------------+
|      Australia|          48|          1|          107|      358.25|            358.25|
|      Australia|          49|          1|          214|       258.9|            617.15|
|      Australia|          50|          2|          133|      387.95|1005.0999999999999|
|        Austria|          50|          2|            3|      257.04|            257.04|
|        Bahrain|          51|          1|           54|      205.74|            205.74|
|        Belgium|          48|          1|          528|       346.1|             346.1|
|        Belgium|          50|          2|          285|      625.16|            971.26|
|        Belgium|          51|          2|          942|      838.65|1809.9099999999999|
|Channel Islands|    

In [22]:
#Joins
orders_list = [("01", "02", 350, 1),
                   ("01", "04", 580, 1),
                   ("01", "07", 320, 2),
                   ("02", "03", 450, 1),
                   ("02", "06", 220, 1),
                   ("03", "01", 195, 1),
                   ("04", "09", 270, 3),
                   ("04", "08", 410, 2),
                   ("05", "02", 350, 1)]

order_df = spark.createDataFrame(orders_list).toDF("order_id", "prod_id", "unit_price", "qty")

product_list = [("01", "Scroll Mouse", 250, 20),
                    ("02", "Optical Mouse", 350, 20),
                    ("03", "Wireless Mouse", 450, 50),
                    ("04", "Wireless Keyboard", 580, 50),
                    ("05", "Standard Keyboard", 360, 10),
                    ("06", "16 GB Flash Storage", 240, 100),
                    ("07", "32 GB Flash Storage", 320, 50),
                    ("08", "64 GB Flash Storage", 430, 25)]
    
product_df = spark.createDataFrame(product_list).toDF("prod_id", "prod_name", "list_price", "qty")

joined_df = order_df.join(product_df, order_df.prod_id == product_df.prod_id, "left").select("order_id","prod_name","unit_price",order_df.qty)

In [24]:
#Fill Nulls
joined_df = joined_df.withColumn("prod_name",expr("coalesce(prod_name,'No Product Name')"))
joined_df.show()

+--------+-------------------+----------+---+
|order_id|          prod_name|unit_price|qty|
+--------+-------------------+----------+---+
|      01|32 GB Flash Storage|       320|  2|
|      03|       Scroll Mouse|       195|  1|
|      04|    No Product Name|       270|  3|
|      04|64 GB Flash Storage|       410|  2|
|      02|     Wireless Mouse|       450|  1|
|      01|      Optical Mouse|       350|  1|
|      05|      Optical Mouse|       350|  1|
|      02|16 GB Flash Storage|       220|  1|
|      01|  Wireless Keyboard|       580|  1|
+--------+-------------------+----------+---+



In [26]:
#Broadcast join
product_df = product_df.toDF("prod_id", "prod_name", "list_price", "reorder_qty")
product_df = product_df.withColumnRenamed("qty","reorder_qty")
joined_df = order_df.join(broadcast(product_df), order_df.prod_id == product_df.prod_id, "left").select("order_id","prod_name","unit_price",order_df.qty)
joined_df.show()

+--------+-------------------+----------+---+
|order_id|          prod_name|unit_price|qty|
+--------+-------------------+----------+---+
|      01|      Optical Mouse|       350|  1|
|      01|  Wireless Keyboard|       580|  1|
|      01|32 GB Flash Storage|       320|  2|
|      02|     Wireless Mouse|       450|  1|
|      02|16 GB Flash Storage|       220|  1|
|      03|       Scroll Mouse|       195|  1|
|      04|               null|       270|  3|
|      04|64 GB Flash Storage|       410|  2|
|      05|      Optical Mouse|       350|  1|
+--------+-------------------+----------+---+



In [32]:
#Bucket by joins 

#Bucket by the join key
spark.sql("create database if not exists my_db")
order_df.write.bucketBy(3,"prod_id").mode("Overwrite").saveAsTable("my_db.orders_table_2")
product_df.write.bucketBy(3,"prod_id").mode("Overwrite").saveAsTable("my_db.products_table_2")
#joined_df = orders_table_2.join(products_table_2, orders_table_1.prod_id == products_table_1.prod_id, "left").select("order_id","prod_name","unit_price",order_df.qty)