<a href="https://colab.research.google.com/github/sku1978/sk-share-repo/blob/main/Spark/SparkDataFrame/SparkDataFrameNotebook.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!apt-get install openjdk-8-jdk-headless -qq  > /dev/null 
!wget -q https://downloads.apache.org/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz > /dev/null 
!pip install -q findspark

In [None]:
!mkdir /content/conf /content/lib
!wget -O /content/conf/log4j.properties https://raw.githubusercontent.com/sku1978/sk-share-repo/main/Spark/SparkDataFrame/conf/log4j.properties > /dev/null 2>&1
!mv /content/spark-3.1.1-bin-hadoop3.2/conf/spark-defaults.conf /content/spark-3.1.1-bin-hadoop3.2/conf/spark-defaults.conf.bk  > /dev/null 2>&1
!wget -O /content/spark-3.1.1-bin-hadoop3.2/conf/spark-defaults.conf https://raw.githubusercontent.com/sku1978/sk-share-repo/main/Spark/SparkDataFrame/conf/spark-defaults.conf  > /dev/null 2>&1
!wget -O /content/conf/spark.conf https://raw.githubusercontent.com/sku1978/sk-share-repo/main/Spark/SparkDataFrame/conf/spark.conf > /dev/null 2>&1

!wget -O /content/lib/logger.py https://raw.githubusercontent.com/sku1978/sk-share-repo/main/Spark/SparkDataFrame/lib/logger.py  > /dev/null 2>&1
!wget -O /content/lib/utils.py https://raw.githubusercontent.com/sku1978/sk-share-repo/main/Spark/SparkDataFrame/lib/utils.py  > /dev/null 2>&1

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

import findspark
findspark.init()
findspark.find()

**Spark UI section**
<br>To use Spark UI, uncomment below sections and use the public link

In [None]:
#!wget https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip
#!unzip ngrok-stable-linux-amd64.zip
#get_ipython().system_raw('./ngrok http 4050 &')

In [None]:
#!curl -s http://localhost:4040/api/tunnels

**Main Section**

In [None]:
from pyspark.sql import *
from pyspark import SparkConf, SparkFiles
from lib.logger import Log4J
from lib.utils import get_spark_app_config

conf=get_spark_app_config()

spark = SparkSession.builder\
        .config(conf=conf)\
        .enableHiveSupport() \
        .getOrCreate()

logger = Log4J(spark)

**Basic CSV Read**

In [None]:
def load_csv_file(spark, url):
   spark.sparkContext.addFile(url)

   survey_df=spark.read \
   .format("csv") \
   .option("header", "true") \
   .option("inferSchema", "true") \
   .option("mode", "FAILFAST") \
   .load('file://'+SparkFiles.get("sample.csv"))

   return survey_df

def count_by_country(survey_df):
  count_df= survey_df.select("Age", "Gender", "Country", "State") \
                     .where("Age <= 40") \
                     .groupBy("Country") \
                     .count()
  return count_df

In [None]:
logger.info("Start CSV Load")

url='https://raw.githubusercontent.com/sku1978/sk-share-repo/main/Spark/SparkDataFrame/data/sample.csv'

survey_df=load_csv_file(spark, url)
partitioned_survey_df=survey_df.repartition(2)

count_df=count_by_country(partitioned_survey_df)

logger.info(count_df.collect())

logger.info("End CSV Load")

**Data Types**<br>
(https://spark.apache.org/docs/2.2.0/api/python/pyspark.sql.html#module-pyspark.sql.types)

**CSV Read (CSV)**

In [None]:
from pyspark.sql.types import StructType, StructField, IntegerType, DateType, StringType

logger.info("Start Flight Time CSV Load")

url='https://raw.githubusercontent.com/sku1978/sk-share-repo/main/Spark/SparkDataFrame/data/flight-time.csv'

spark.sparkContext.addFile(url)

flightSchemaStruct = StructType([StructField("FL_DATE", DateType(), True),
                                 StructField("OP_CARRIER", StringType(), True),
                                 StructField("OP_CARRIER_FL_NUM", IntegerType(), True),
                                 StructField("ORIGIN", StringType(), True),
                                 StructField("ORIGIN_CITY_NAME", StringType(), True),
                                 StructField("DEST", StringType(), True),
                                 StructField("DEST_CITY_NAME", StringType(), True),
                                 StructField("CRS_DEP_TIME", IntegerType(), True),
                                 StructField("DEP_TIME", IntegerType(), True),
                                 StructField("WHEELS_ON", IntegerType(), True),
                                 StructField("TAXI_IN", IntegerType(), True),
                                 StructField("CRS_ARR_TIME", IntegerType(), True),
                                 StructField("ARR_TIME", IntegerType(), True),
                                 StructField("CANCELLED", IntegerType(), True),
                                 StructField("DISTANCE", StringType(), True),
                                ])

flight_time_df=spark.read \
                    .format("csv") \
                    .option("header", "true") \
                    .option("mode", "FAILFAST") \
                    .option("dateFormat", "M/d/y") \
                    .schema(flightSchemaStruct) \
                    .load('file://'+SparkFiles.get("flight-time.csv"))

flight_time_df.show()

logger.info("End CSV Load")

**JSON Read**

In [None]:
logger.info("Start Flight Time JSON Load")

url='https://raw.githubusercontent.com/sku1978/sk-share-repo/main/Spark/SparkDataFrame/data/flight-time.json'

spark.sparkContext.addFile(url)

flightSchemaDDL = """FL_DATE DATE, 
                     OP_CARRIER STRING, 
                     OP_CARRIER_FL_NUM INT,
                     ORIGIN STRING,
                     ORIGIN_CITY_NAME STRING,
                     DEST STRING,
                     DEST_CITY_NAME STRING,
                     CRS_DEP_TIME INT,
                     DEP_TIME INT,
                     WHEELS_ON INT,
                     TAXI_IN INT,
                     CRS_ARR_TIME INT,
                     ARR_TIME INT,
                     CANCELLED INT,
                     DISTANCE STRING"""

flight_time_df=spark.read \
                    .format("json") \
                    .option("dateFormat", "M/d/y") \
                    .schema(flightSchemaDDL) \
                    .load('file://'+SparkFiles.get("flight-time.json"))

flight_time_df.show()

logger.info("End JSON Load")

**Parquet Read**

In [None]:
logger.info("Start Flight Time Parquet Load")

url='https://raw.githubusercontent.com/sku1978/sk-share-repo/main/Spark/SparkDataFrame/data/flight-time.parquet'

spark.sparkContext.addFile(url)

flight_time_df=spark.read \
                    .format("parquet") \
                    .option("inferSchema", "true") \
                    .load('file://'+SparkFiles.get("flight-time.parquet"))

flight_time_df.show()

logger.info("End Parquet Load")

**Write Avro file**<br>
Added <br>
spark.jars.packages                org.apache.spark:spark-avro_2.12:3.0.0
<br>to<br>
conf/spark-defaults.conf

In [None]:
from pyspark.sql.functions  import spark_partition_id

logger.info("Start Flight Time Parquet Load")

url='https://raw.githubusercontent.com/sku1978/sk-share-repo/main/Spark/SparkDataFrame/data/flight-time.parquet'

spark.sparkContext.addFile(url)

flight_time_df=spark.read \
                    .format("parquet") \
                    .option("inferSchema", "true") \
                    .load('file://'+SparkFiles.get("flight-time.parquet"))

logger.info("End Parquet Load")
logger.info("Number of partitions: " + str(flight_time_df.rdd.getNumPartitions()))
#Even though there are two partitions, since the records are less, only one partition is used. Can repartition to get more partitions in o/p
logger.info("Records per partition: " + str(flight_time_df.groupBy(spark_partition_id()).count().collect()))

logger.info("Start Avro write")

flight_time_df.write \
              .format("avro") \
              .mode("overwrite") \
              .option("path", "dataSink/avro/") \
              .save()

logger.info("End Avro write")

In [None]:
!ls -l dataSink/avro/

**Partioned Write**

In [None]:
logger.info("Start Flight Time Parquet Load")

url='https://raw.githubusercontent.com/sku1978/sk-share-repo/main/Spark/SparkDataFrame/data/flight-time.parquet'

spark.sparkContext.addFile(url)

flight_time_df=spark.read \
                    .format("parquet") \
                    .option("inferSchema", "true") \
                    .load('file://'+SparkFiles.get("flight-time.parquet"))

logger.info("Start Partitioned JSON write")

#To check file split at 10K records, look for this example
#!wc -l dataSink/json/OP_CARRIER\=DL/ORIGIN\=ATL/*
flight_time_df.write \
              .format("json") \
              .mode("overwrite") \
              .option("path", "dataSink/json/") \
              .partitionBy("OP_CARRIER", "ORIGIN") \
              .option("maxRecordsPerFile", 10000) \
              .save()

logger.info("End Partitioned JSON write")

In [None]:
!ls -l dataSink/json/

**Managed tables**<br>
Config has been setup to write tables into *warehouse_location* folder

In [None]:
logger.info("Start Flight Time Parquet Load")

url='https://raw.githubusercontent.com/sku1978/sk-share-repo/main/Spark/SparkDataFrame/data/flight-time.parquet'

spark.sparkContext.addFile(url)

flight_time_df=spark.read \
                    .format("parquet") \
                    .option("inferSchema", "true") \
                    .load('file://'+SparkFiles.get("flight-time.parquet"))

logger.info("Create database")
spark.sql("CREATE DATABASE IF NOT EXISTS AIRLINES_DB")
spark.catalog.setCurrentDatabase("AIRLINES_DB")

logger.info("Write table")
flight_time_df.write \
              .mode("overwrite") \
              .bucketBy(5,"OP_CARRIER", "ORIGIN") \
              .sortBy("OP_CARRIER", "ORIGIN") \
              .saveAsTable("flight_data_tbl")

spark.sql("SELECT * FROM AIRLINES_DB.flight_data_tbl").show(10)

logger.info(spark.catalog.listTables("AIRLINES_DB"))

In [None]:
!ls -lR spark-warehouse/

**Log File/Raw Data/Unstructured Data handling**

In [None]:
from pyspark.sql.functions import *

logger.info("Start Log file load")

url='https://raw.githubusercontent.com/sku1978/sk-share-repo/main/Spark/SparkDataFrame/data/apache_logs.txt'
spark.sparkContext.addFile(url)

raw_df=spark.read \
                    .format("text") \
                    .load('file://'+SparkFiles.get("apache_logs.txt"))

raw_df.printSchema()

log_reg = r'^(\S+) (\S+) (\S+) \[([\w:/]+\s[+\-]\d{4})\] "(\S+) (\S+) (\S+)" (\d{3}) (\S+) "(\S+)" "([^"]*)'

logs_df = raw_df.select(regexp_extract('value', log_reg, 1).alias('ip'),
                        regexp_extract('value', log_reg, 4).alias('date'),
                        regexp_extract('value', log_reg, 6).alias('request'),
                        regexp_extract('value', log_reg, 10).alias('referrer'))
logs_df.printSchema()

logs_df.withColumn('referrer', substring_index('referrer', '/',3)) \
       .where("trim(referrer) != '-' ") \
       .groupBy('referrer') \
       .count() \
       .show(100, truncate=False)

**Handcrafted DataFrame creating Rows**

In [None]:
from pyspark.sql.types import *

my_schema=StructType([StructField("ID", StringType(), True),
                      StructField("EventDate", StringType(), True)])

my_rows=[Row("123", "21/03/2019"), 
         Row("234", "21/05/2029"), 
         Row("345", "01/02/2020"),
         Row("456", "09/12/2018"),
         Row("567", "03/06/2019"),]

my_rdd=spark.sparkContext.parallelize(my_rows, 2)

my_df=spark.createDataFrame(my_rdd, my_schema)

my_df.printSchema()
my_df.show()

new_df=my_df.withColumn('EventDate', to_date('EventDate', 'd/M/y'))

new_df.printSchema()
new_df.show()

**Column Expressions**

In [None]:
from pyspark.sql.types import StructType, StructField, IntegerType, DateType, StringType
from pyspark.sql.functions import *

logger.info("Start CSV Load")

url='https://raw.githubusercontent.com/sku1978/sk-share-repo/main/Spark/SparkDataFrame/data/databricks-airlines.csv'

spark.sparkContext.addFile(url)

airlines_df=spark.read \
                    .format("csv") \
                    .option("header", "true") \
                    .option("mode", "FAILFAST") \
                    .option("inferSchema", "true") \
                    .option("samplingRatio", "0.5") \
                    .load('file://'+SparkFiles.get("databricks-airlines.csv"))

#shows three ways to refer to columns
airlines_df.select("Origin", col("Dest"), airlines_df.IsArrDelayed).show(10)

In [None]:
airlines_df.select("Origin", "Dest", "Distance", "Year", "Month", "DayOfMonth").show(10)

airlines_df.select("Origin", "Dest", "Distance", expr("to_date(concat(Year, Month, DayOfMonth), 'yyyyMMdd') as FlightDate")).show(10)

airlines_df.select("Origin", "Dest", "Distance", to_date(concat("Year", "Month", "DayOfMonth"), 'yyyyMMdd').alias("FlightDate")).show(10)

**User Defined Function (UDF)**<br>
to be used as column expression

In [None]:
import re
def parse_gender(gender):
  female_pattern = r'^f$|f.m|w.m'
  male_pattern = r'^m$|ma|m.l'

  if re.search(female_pattern, gender.lower()):
    return "Female"
  elif re.search(male_pattern, gender.lower()):
    return "Male"
  else:
    return "Unknown"

In [None]:
url='https://raw.githubusercontent.com/sku1978/sk-share-repo/main/Spark/SparkDataFrame/data/survey.csv'

spark.sparkContext.addFile(url)

survey_df=spark.read \
                    .format("csv") \
                    .option("header", "true") \
                    .option("mode", "FAILFAST") \
                    .option("inferSchema", "true") \
                    .load('file://'+SparkFiles.get("survey.csv"))

survey_df.show(10)

#Column object UDF (no entry in catalog)
parse_gender_udf=udf(parse_gender, StringType())
survey_df2=survey_df.withColumn("Gender", parse_gender_udf("Gender"))
survey_df2.show(10)

#SQL UDF (entry goes into catalog)
spark.udf.register("parse_gender_udf", parse_gender, StringType())
survey_df3=survey_df.withColumn("Gender", expr("parse_gender_udf(Gender)"))
survey_df3.show(10)

spark.catalog.listFunctions()

**Miscellaneous functions**

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, monotonically_increasing_id, when, expr
from pyspark.sql.types import *

data_list = [("Ravi", "28", "1", "2002"),
             ("Abdul", "23", "5", "81"),  # 1981
             ("John", "12", "12", "6"),  # 2006
             ("Rosy", "7", "8", "63"),  # 1963
             ("Abdul", "23", "5", "81")]  # 1981

raw_df = spark.createDataFrame(data_list).toDF("name", "day", "month", "year")
raw_df.printSchema()

In [None]:
#Add a new column

df1 = raw_df.withColumn("id", monotonically_increasing_id())
df1.show(10)

In [None]:
# CASE WHEN and CAST
df2=df1.withColumn("year", expr("""
                                CASE WHEN year < 21  THEN cast(year as int) + 2000
                                     WHEN year < 100 THEN cast(year as int) + 1900
                                     ELSE cast(year as int)
                                END
                                """))
df2.show()

df2=df1.withColumn("year", expr("""
                                CASE WHEN year < 21  THEN year + 2000
                                     WHEN year < 100 THEN year + 1900
                                     ELSE year
                                END
                                """).cast(IntegerType()))
df2.show()


In [None]:
# Cast the fields and alternative WHEN
df3=df1.withColumn("day", col("day").cast(IntegerType())) \
       .withColumn("month", col("month").cast(IntegerType())) \
       .withColumn("year", col("year").cast(IntegerType())) \

df3.printSchema()

df4=df3.withColumn("year", \
                          when(col("year") < 21, col("year") + 2000) \
                        .when(col("year") < 100, col("year") + 1900) \
                        .otherwise(col("year")))
df4.show()

**Add/Remove columns**

In [None]:
df5=df2.withColumn("dob", expr("to_date(concat(day, '/', month, '/', year), 'd/M/y')")) \
       .drop("day", "month", "year") \
       .dropDuplicates(["name", "dob"])

df5.sort(df5.dob.desc()).show()

**Aggregation**

In [None]:
from pyspark.sql import functions as f
url='https://raw.githubusercontent.com/sku1978/sk-share-repo/main/Spark/SparkDataFrame/data/invoices.csv'

spark.sparkContext.addFile(url)

invoice_df=spark.read \
                    .format("csv") \
                    .option("header", "true") \
                    .option("mode", "FAILFAST") \
                    .option("inferSchema", "true") \
                    .load('file://'+SparkFiles.get("invoices.csv"))

invoice_df.show(10)

invoice_df.select(f.count("*").alias("Count *"),
                  f.sum("Quantity").alias("TotalQuantity"),
                  f.avg("UnitPrice").alias("AvgPrice"),
                  f.countDistinct("InvoiceNo").alias("CountDistinct")
                  ).show()

invoice_df.selectExpr(
                  "count(1) as `count 1`",
                  "count(StockCode) as `count field`",
                  "sum(Quantity) as TotalQuantity",
                  "avg(UnitPrice) as AvgPrice"
                ).show()

invoice_df.createOrReplaceTempView("sales")
summary_sql = spark.sql("""
      SELECT Country, InvoiceNo,
            sum(Quantity) as TotalQuantity,
            round(sum(Quantity*UnitPrice),2) as InvoiceValue
      FROM sales
      GROUP BY Country, InvoiceNo""")
summary_sql.show()

summary_df = invoice_df \
    .groupBy("Country", "InvoiceNo") \
    .agg(f.sum("Quantity").alias("TotalQuantity"),
         f.round(f.sum(f.expr("Quantity * UnitPrice")), 2).alias("InvoiceValue"),
         f.expr("round(sum(Quantity * UnitPrice),2) as InvoiceValueExpr")
         )
summary_df.show()

**Group By**

In [None]:
from pyspark.sql import functions as f
url='https://raw.githubusercontent.com/sku1978/sk-share-repo/main/Spark/SparkDataFrame/data/invoices.csv'

spark.sparkContext.addFile(url)

invoice_df=spark.read \
                    .format("csv") \
                    .option("header", "true") \
                    .option("mode", "FAILFAST") \
                    .option("inferSchema", "true") \
                    .load('file://'+SparkFiles.get("invoices.csv"))

invoice_df.show(10)

NumInvoices = f.countDistinct("InvoiceNo").alias("NumInvoices")
TotalQuantity = f.sum("Quantity").alias("TotalQuantity")
InvoiceValue = f.expr("round(sum(Quantity * UnitPrice),2) as InvoiceValue")

exSummary_df = invoice_df \
    .withColumn("InvoiceDate", f.to_date(f.col("InvoiceDate"), "dd-MM-yyyy H.mm")) \
    .where("year(InvoiceDate) == 2010") \
    .withColumn("WeekNumber", f.weekofyear(f.col("InvoiceDate"))) \
    .groupBy("Country", "WeekNumber") \
    .agg(NumInvoices, TotalQuantity, InvoiceValue)

exSummary_df.sort("Country", "WeekNumber").show()

exSummary_df.coalesce(1) \
    .write \
    .format("parquet") \
    .mode("overwrite") \
    .save("output")

In [None]:
!ls -l output/

**Windowing**

In [None]:
from pyspark.sql import functions as f
url='https://raw.githubusercontent.com/sku1978/sk-share-repo/main/Spark/SparkDataFrame/data/summary.parquet'

spark.sparkContext.addFile(url)

summary_df=spark.read \
                    .format("parquet") \
                    .load('file://'+SparkFiles.get("summary.parquet"))

summary_df.sort("Country", "WeekNumber").show()

running_total_window = Window.partitionBy("Country") \
    .orderBy("WeekNumber") \
    .rowsBetween(Window.unboundedPreceding, Window.currentRow)

summary_df.withColumn("RunningTotal",
                      f.sum("InvoiceValue").over(running_total_window)) \
    .sort("Country", "WeekNumber").show()

**Ranking**

In [None]:
from pyspark.sql import functions as f
url='https://raw.githubusercontent.com/sku1978/sk-share-repo/main/Spark/SparkDataFrame/data/summary.parquet'

spark.sparkContext.addFile(url)

summary_df=spark.read \
                    .format("parquet") \
                    .load('file://'+SparkFiles.get("summary.parquet"))

summary_df.sort("Country", "WeekNumber").show()

rank_window = Window.partitionBy("Country") \
        .orderBy(f.col("InvoiceValue").desc()) \
        .rowsBetween(Window.unboundedPreceding, Window.currentRow)

df = summary_df.withColumn("Rank", f.dense_rank().over(rank_window)) \
    .where(f.col("Rank") <= 2) \
    .sort("Country", "Rank") \
    .show()

**Joins**

In [None]:
    orders_list = [("01", "02", 350, 1),
                   ("01", "04", 580, 1),
                   ("01", "07", 320, 2),
                   ("02", "03", 450, 1),
                   ("02", "06", 220, 1),
                   ("03", "01", 195, 1),
                   ("04", "09", 270, 3),
                   ("04", "08", 410, 2),
                   ("05", "02", 350, 1)]

    order_df = spark.createDataFrame(orders_list).toDF("order_id", "prod_id", "unit_price", "qty")

    product_list = [("01", "Scroll Mouse", 250, 20),
                    ("02", "Optical Mouse", 350, 20),
                    ("03", "Wireless Mouse", 450, 50),
                    ("04", "Wireless Keyboard", 580, 50),
                    ("05", "Standard Keyboard", 360, 10),
                    ("06", "16 GB Flash Storage", 240, 100),
                    ("07", "32 GB Flash Storage", 320, 50),
                    ("08", "64 GB Flash Storage", 430, 25)]

    product_df = spark.createDataFrame(product_list).toDF("prod_id", "prod_name", "list_price", "qty")

    product_df.show()
    order_df.show()

    join_expr = order_df.prod_id == product_df.prod_id

    product_renamed_df = product_df.withColumnRenamed("qty", "reorder_qty")

    order_df.join(product_renamed_df, join_expr, "inner") \
        .drop(product_renamed_df.prod_id) \
        .select("order_id", "prod_id", "prod_name", "unit_price", "list_price", "qty") \
        .show()

In [None]:
    orders_list = [("01", "02", 350, 1),
                   ("01", "04", 580, 1),
                   ("01", "07", 320, 2),
                   ("02", "03", 450, 1),
                   ("02", "06", 220, 1),
                   ("03", "01", 195, 1),
                   ("04", "09", 270, 3),
                   ("04", "08", 410, 2),
                   ("05", "02", 350, 1)]

    order_df = spark.createDataFrame(orders_list).toDF("order_id", "prod_id", "unit_price", "qty")

    product_list = [("01", "Scroll Mouse", 250, 20),
                    ("02", "Optical Mouse", 350, 20),
                    ("03", "Wireless Mouse", 450, 50),
                    ("04", "Wireless Keyboard", 580, 50),
                    ("05", "Standard Keyboard", 360, 10),
                    ("06", "16 GB Flash Storage", 240, 100),
                    ("07", "32 GB Flash Storage", 320, 50),
                    ("08", "64 GB Flash Storage", 430, 25)]

    product_df = spark.createDataFrame(product_list).toDF("prod_id", "prod_name", "list_price", "qty")

    product_df.show()
    order_df.show()

    join_expr = order_df.prod_id == product_df.prod_id

    product_renamed_df = product_df.withColumnRenamed("qty", "reorder_qty")

    order_df.join(product_renamed_df, join_expr, "left") \
        .drop(product_renamed_df.prod_id) \
        .select("order_id", "prod_id", "prod_name", "unit_price", "list_price", "qty") \
        .withColumn("prod_name", expr("coalesce(prod_name, prod_id)")) \
        .withColumn("list_price", expr("coalesce(list_price, unit_price)")) \
        .sort("order_id") \
        .show()

**View Log**

In [None]:
!cat app-logs/sparklog.log