In [1]:
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [2]:
def to_date_df(df, fmt, fld):
    return df.withColumn(fld, to_date(fld, fmt))


if __name__ == "__main__":
    spark = SparkSession \
        .builder \
        .master("local[3]") \
        .appName("RowDemo") \
        .getOrCreate()



    my_schema = StructType([
        StructField("ID", StringType()),
        StructField("EventDate", StringType())])

    my_rows = [Row("123", "04/05/2020"), Row("124", "4/5/2020"), Row("125", "04/5/2020"), Row("126", "4/05/2020")]
    my_rdd = spark.sparkContext.parallelize(my_rows, 2)
    my_df = spark.createDataFrame(my_rdd, my_schema)

    my_df.printSchema()
    my_df.show()
    new_df = to_date_df(my_df, "M/d/y", "EventDate")
    new_df.printSchema()
    new_df.show()

root
 |-- ID: string (nullable = true)
 |-- EventDate: string (nullable = true)

+---+----------+
| ID| EventDate|
+---+----------+
|123|04/05/2020|
|124|  4/5/2020|
|125| 04/5/2020|
|126| 4/05/2020|
+---+----------+

root
 |-- ID: string (nullable = true)
 |-- EventDate: date (nullable = true)

+---+----------+
| ID| EventDate|
+---+----------+
|123|2020-04-05|
|124|2020-04-05|
|125|2020-04-05|
|126|2020-04-05|
+---+----------+



# Read Unstructured File

In [1]:
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [3]:
spark = SparkSession \
    .builder \
    .master("local[3]") \
    .appName("RowDemo") \
    .getOrCreate()
    
df = spark.read.text('apache_log.txt')
df.printSchema()

root
 |-- value: string (nullable = true)



In [9]:
log_reg = r'^(\S+) (\S+) (\S+) \[([\w:/]+\s[+\-]\d{4})\] "(\S+) (\S+) (\S+)" (\d{3}) (\S+) "(\S+)" "([^"]*)'
logs_df = df.select(regexp_extract('value',log_reg,1).alias('ip'),
                    regexp_extract('value',log_reg,4).alias('date'),
                    regexp_extract('value',log_reg,6).alias('request'),
                    regexp_extract('value',log_reg,10).alias('referrer'))
logs_df.printSchema()

root
 |-- ip: string (nullable = true)
 |-- date: string (nullable = true)
 |-- request: string (nullable = true)
 |-- referrer: string (nullable = true)



In [10]:
logs_df.show(5)

+------------+--------------------+--------------------+--------------------+
|          ip|                date|             request|            referrer|
+------------+--------------------+--------------------+--------------------+
|83.149.9.216|17/May/2015:10:05...|/presentations/lo...|http://semicomple...|
|83.149.9.216|17/May/2015:10:05...|/presentations/lo...|http://semicomple...|
|83.149.9.216|17/May/2015:10:05...|/presentations/lo...|http://semicomple...|
|83.149.9.216|17/May/2015:10:05...|/presentations/lo...|http://semicomple...|
|83.149.9.216|17/May/2015:10:05...|/presentations/lo...|http://semicomple...|
+------------+--------------------+--------------------+--------------------+
only showing top 5 rows



In [11]:
logs_df.where("referrer != '-'").withColumn('referrer',substring_index('referrer',"/",3)).groupBy('referrer').count().orderBy(col('count').desc()).show(100)

+--------------------+-----+
|            referrer|count|
+--------------------+-----+
|http://www.semico...| 3038|
|http://semicomple...| 2001|
|http://www.google...|  123|
|https://www.googl...|  105|
|http://stackoverf...|   34|
|http://www.google.fr|   31|
|http://s-chassis....|   29|
| http://logstash.net|   28|
|http://www.google.es|   25|
|https://www.googl...|   23|
|http://www.s-chas...|   22|
|http://www.google.de|   18|
|https://www.googl...|   15|
|http://www.google...|   14|
|https://www.googl...|   13|
|https://www.googl...|   13|
|http://www.google...|   12|
| http://tuxradar.com|   12|
|http://r.duckduck...|   11|
|http://en.wikiped...|   10|
|http://kufli.blog...|   10|
|https://www.googl...|    9|
|http://unix.stack...|    8|
|http://www.google.it|    7|
|https://www.googl...|    7|
|http://www.google.ru|    6|
|http://www.google.ca|    6|
|https://www.googl...|    6|
| http://www.bing.com|    6|
|http://www.google.dk|    6|
|https://www.googl...|    5|
|http://www.li

# Columns Tranforms

In [23]:
airlinesDF = spark.read \
.format("csv") \
.option("header", "true") \
.option("inferSchema","true") \
.load("flights.csv")



In [18]:
airlinesDF.printSchema()

root
 |-- #Airline: string (nullable = true)
 |--  Departure: string (nullable = true)
 |--  Arrival: string (nullable = true)
 |--  Plane: string (nullable = true)



In [13]:
airlinesDF.show(10)

+--------+----------+--------+-----------+
|#Airline| Departure| Arrival|      Plane|
+--------+----------+--------+-----------+
|      NF|       NUS|     VLI|YN2;DHT;BNI|
|      NF|       SON|     LNE|YN2;DHT;BNI|
|      NF|       CCV|     NUS|    YN2;DHT|
|      NF|       VLI|     NUS|    YN2;DHT|
|      NF|       AUY|     TAH|    YN2;BNI|
|      NF|       FTA|     TAH|    YN2;BNI|
|      NF|       LNE|     SON|    YN2;BNI|
|      NF|       TAH|     AUY|    YN2;BNI|
|      NF|       TAH|     AWD|    YN2;BNI|
|      NF|       AWD|     FTA|        YN2|
+--------+----------+--------+-----------+
only showing top 10 rows



In [25]:
airlinesDF.select(column('#Airline'),column('Arrival'),expr("concat(Arrival,'-',Plane) as concCol")).show(10)

+--------+-------+---------------+
|#Airline|Arrival|        concCol|
+--------+-------+---------------+
|      NF|    VLI|VLI-YN2;DHT;BNI|
|      NF|    LNE|LNE-YN2;DHT;BNI|
|      NF|    NUS|    NUS-YN2;DHT|
|      NF|    NUS|    NUS-YN2;DHT|
|      NF|    TAH|    TAH-YN2;BNI|
|      NF|    TAH|    TAH-YN2;BNI|
|      NF|    SON|    SON-YN2;BNI|
|      NF|    AUY|    AUY-YN2;BNI|
|      NF|    AWD|    AWD-YN2;BNI|
|      NF|    FTA|        FTA-YN2|
+--------+-------+---------------+
only showing top 10 rows



In [26]:
airlinesDF.select(column('#Airline'),column('Arrival'),expr("split(Plane,';')[0] as concCol")).show(10)

+--------+-------+-------+
|#Airline|Arrival|concCol|
+--------+-------+-------+
|      NF|    VLI|    YN2|
|      NF|    LNE|    YN2|
|      NF|    NUS|    YN2|
|      NF|    NUS|    YN2|
|      NF|    TAH|    YN2|
|      NF|    TAH|    YN2|
|      NF|    SON|    YN2|
|      NF|    AUY|    YN2|
|      NF|    AWD|    YN2|
|      NF|    FTA|    YN2|
+--------+-------+-------+
only showing top 10 rows



# UDF

In [28]:
surveyDF = spark.read \
.format("csv") \
.option("header", "true") \
.option("inferSchema","true") \
.load("survey.csv")

surveyDF.printSchema()

root
 |-- Timestamp: timestamp (nullable = true)
 |-- Age: long (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- state: string (nullable = true)
 |-- self_employed: string (nullable = true)
 |-- family_history: string (nullable = true)
 |-- treatment: string (nullable = true)
 |-- work_interfere: string (nullable = true)
 |-- no_employees: string (nullable = true)
 |-- remote_work: string (nullable = true)
 |-- tech_company: string (nullable = true)
 |-- benefits: string (nullable = true)
 |-- care_options: string (nullable = true)
 |-- wellness_program: string (nullable = true)
 |-- seek_help: string (nullable = true)
 |-- anonymity: string (nullable = true)
 |-- leave: string (nullable = true)
 |-- mental_health_consequence: string (nullable = true)
 |-- phys_health_consequence: string (nullable = true)
 |-- coworkers: string (nullable = true)
 |-- supervisor: string (nullable = true)
 |-- mental_health_interview: string (nullable = 

In [29]:
surveyDF.show(10)

+-------------------+---+------+--------------+-----+-------------+--------------+---------+--------------+--------------+-----------+------------+----------+------------+----------------+----------+----------+------------------+-------------------------+-----------------------+------------+----------+-----------------------+---------------------+------------------+---------------+--------+
|          Timestamp|Age|Gender|       Country|state|self_employed|family_history|treatment|work_interfere|  no_employees|remote_work|tech_company|  benefits|care_options|wellness_program| seek_help| anonymity|             leave|mental_health_consequence|phys_health_consequence|   coworkers|supervisor|mental_health_interview|phys_health_interview|mental_vs_physical|obs_consequence|comments|
+-------------------+---+------+--------------+-----+-------------+--------------+---------+--------------+--------------+-----------+------------+----------+------------+----------------+----------+----------+--

In [31]:
#fix gender column
import re
def parse_gender(gender):
    female_pattern = r"^f$|f.m|w.m"
    male_pattern = r"^m$|ma|m.l"
    if re.search(female_pattern, gender.lower()):
        return "Female"
    if re.search(male_pattern, gender.lower()):
        return "Male"
    else:
        return "Unknown"

In [36]:
#register UDF function
parse_gender_udf = udf(parse_gender,StringType())
surveyDF2 = surveyDF.withColumn('Gender',parse_gender_udf('Gender'))

In [38]:
surveyDF2.select('Gender').show(10)

+------+
|Gender|
+------+
|Female|
|  Male|
|  Male|
|  Male|
|  Male|
|  Male|
|Female|
|  Male|
|Female|
|  Male|
+------+
only showing top 10 rows



# Misc

In [4]:
spark = SparkSession \
    .builder \
    .master("local[3]") \
    .appName("RowDemo") \
    .getOrCreate()

data_list = [("Ravi", "28", "1", "2002"),
                 ("Abdul", "23", "5", "81"),  # 1981
                 ("John", "12", "12", "6"),  # 2006
                 ("Rosy", "7", "8", "63"),  # 1963
                 ("Abdul", "23", "5", "81")]  # 1981

raw_df = spark.createDataFrame(data_list).toDF("name", "day", "month", "year").repartition(3)
raw_df.printSchema()

root
 |-- name: string (nullable = true)
 |-- day: string (nullable = true)
 |-- month: string (nullable = true)
 |-- year: string (nullable = true)



In [5]:
df1 = raw_df.withColumn('id',monotonically_increasing_id())

df1.show()

+-----+---+-----+----+-----------+
| name|day|month|year|         id|
+-----+---+-----+----+-----------+
| Ravi| 28|    1|2002|          0|
|Abdul| 23|    5|  81|          1|
|Abdul| 23|    5|  81| 8589934592|
| John| 12|   12|   6|17179869184|
| Rosy|  7|    8|  63|17179869185|
+-----+---+-----+----+-----------+



In [7]:
df2 = raw_df.withColumn('year',expr('CASE WHEN year <21 then cast(year+2000 as int) WHEN year < 100 then cast(1900 + year as int) else cast(year as int) end'))
df2.show()

+-----+---+-----+----+
| name|day|month|year|
+-----+---+-----+----+
| Ravi| 28|    1|2002|
|Abdul| 23|    5|1981|
|Abdul| 23|    5|1981|
| John| 12|   12|2006|
| Rosy|  7|    8|1963|
+-----+---+-----+----+



In [14]:
df1 = df2.withColumn('dob',to_date(expr("concat(day,'/',month,'/',year)"),'d/M/y')).drop("day","month","year") \
         .dropDuplicates(["name","dob"]).sort(expr("dob desc"))
df1.show()

+-----+----------+
| name|       dob|
+-----+----------+
| Rosy|1963-08-07|
|Abdul|1981-05-23|
| Ravi|2002-01-28|
| John|2006-12-12|
+-----+----------+



# Aggregations

In [3]:
spark = SparkSession \
    .builder \
    .master("local[3]") \
    .appName("Agg") \
    .getOrCreate()

In [4]:
inv_df = spark.read.format('csv').option('InferSchema','true').option('header','true').load('invoices.csv')

In [5]:
inv_df.show(10)

+---------+---------+--------------------+--------+---------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|    InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+---------------+---------+----------+--------------+
|   536365|     null|WHITE HANGING HEA...|       6|01-12-2010 8.26|     2.55|     17850|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|01-12-2010 8.26|     3.39|     17850|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|01-12-2010 8.26|     2.75|     17850|United Kingdom|
|   536365|   84029G|KNITTED UNION FLA...|       6|01-12-2010 8.26|     3.39|     17850|United Kingdom|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|01-12-2010 8.26|     3.39|     17850|United Kingdom|
|   536365|    22752|SET 7 BABUSHKA NE...|       2|01-12-2010 8.26|     7.65|     17850|United Kingdom|
|   536365|    21730|GLASS STAR FROSTE...|       6|01-12-2010 8.

In [31]:
inv_df.printSchema()

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: string (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: integer (nullable = true)
 |-- Country: string (nullable = true)



In [2]:
from pyspark.sql import functions as f
from pyspark.sql.functions import countDistinct

In [27]:
inv_df.select(f.count("*").alias("Count *"),
             f.sum("Quantity").alias('TotQuantity'),
             f.mean("UnitPrice").alias('MeanPrice'),
             countDistinct('InvoiceNo').alias('CountDistinct')).show()

+-------+-----------+-----------------+-------------+
|Count *|TotQuantity|        MeanPrice|CountDistinct|
+-------+-----------+-----------------+-------------+
| 541909|    5176450|4.611113626088481|        25900|
+-------+-----------+-----------------+-------------+



In [22]:
inv_df.selectExpr("count(1) as `count`",
                 "sum(Quantity)",
                 "avg(UnitPrice) ").show()

+------+-------------+-----------------+
| count|sum(Quantity)|   avg(UnitPrice)|
+------+-------------+-----------------+
|541909|      5176450|4.611113626086849|
+------+-------------+-----------------+



In [28]:
#group by 
summary_df = inv_df.groupBy('Country','InvoiceNo') \
.agg(f.sum("Quantity").alias('TotQuantity')).show()

+--------------+---------+-----------+
|       Country|InvoiceNo|TotQuantity|
+--------------+---------+-----------+
|United Kingdom|   536446|        329|
|United Kingdom|   536508|        216|
|United Kingdom|   537018|         -3|
|United Kingdom|   537401|        -24|
|United Kingdom|   537811|         74|
|United Kingdom|  C537824|         -2|
|United Kingdom|   538895|        370|
|United Kingdom|   540453|        341|
|United Kingdom|   541291|        217|
|United Kingdom|   542551|         -1|
|United Kingdom|   542576|         -1|
|United Kingdom|   542628|          9|
|United Kingdom|   542886|        199|
|United Kingdom|   542907|         75|
|United Kingdom|   543131|        134|
|United Kingdom|   543189|        102|
|United Kingdom|   543265|         -4|
|        Cyprus|   544574|        173|
|United Kingdom|   545077|         24|
|United Kingdom|   545300|        116|
+--------------+---------+-----------+
only showing top 20 rows



In [80]:
temp_df = inv_df.withColumn('InvoiceDate',expr("split(InvoiceDate,' ')[0]"))

temp_df1 = temp_df.withColumn('InvoiceDate',f.to_date('InvoiceDate',"dd-MM-yyyy"))

fin_df = temp_df1.withColumn('WeekNo',f.weekofyear('InvoiceDate'))

sum_df =fin_df.groupBy('Country','WeekNo').agg(f.countDistinct("InvoiceNo").alias('NumInvoices'),
                                               f.sum('Quantity').alias('TotalQuantity'),
                                               f.expr('round(sum(UnitPrice * Quantity),3) as InvoiceValue'))

In [81]:
sum_df.filter(sum_df.Country == "EIRE").orderBy('WeekNo').show()

+-------+------+-----------+-------------+------------+
|Country|WeekNo|NumInvoices|TotalQuantity|InvoiceValue|
+-------+------+-----------+-------------+------------+
|   EIRE|     1|          2|          -17|       25.33|
|   EIRE|     2|          2|         6194|    16759.72|
|   EIRE|     3|          2|         1619|     3222.42|
|   EIRE|     4|          3|          892|     1649.05|
|   EIRE|     5|          4|          812|     1830.43|
|   EIRE|     6|          1|           -4|       -25.8|
|   EIRE|     7|          6|         4052|     7927.03|
|   EIRE|     8|          4|          468|      -57.52|
|   EIRE|     9|          4|         1514|     3716.44|
|   EIRE|    10|          8|         3416|     6162.02|
|   EIRE|    11|         10|         2864|     6533.19|
|   EIRE|    12|          3|          644|     1141.03|
|   EIRE|    13|          4|         1837|     2637.74|
|   EIRE|    14|          5|         2284|     3987.52|
|   EIRE|    15|          3|          741|     1

# Window Function

In [7]:
summary_df = spark.read.parquet('summary.parquet')

In [8]:
summary_df.show(10)

+---------+----------+-----------+-------------+------------+
|  Country|WeekNumber|NumInvoices|TotalQuantity|InvoiceValue|
+---------+----------+-----------+-------------+------------+
|    Spain|        49|          1|           67|      174.72|
|  Germany|        48|         11|         1795|     3309.75|
|Lithuania|        48|          3|          622|     1598.06|
|  Germany|        49|         12|         1852|     4521.39|
|  Bahrain|        51|          1|           54|      205.74|
|  Iceland|        49|          1|          319|      711.79|
|     EIRE|        51|          5|           95|      276.84|
|Australia|        50|          2|          133|      387.95|
|    Italy|        49|          1|           -2|       -17.0|
|     EIRE|        49|          5|         1280|      3284.1|
+---------+----------+-----------+-------------+------------+
only showing top 10 rows



In [9]:
running_total_window = Window.partitionBy("Country") \
    .orderBy('Weeknumber') \
    .rowsBetween(Window.unboundedPreceding,Window.currentRow)
summary_df.withColumn("RunningTotal",f.sum('InvoiceValue').over(running_total_window)).show()

+---------------+----------+-----------+-------------+------------+------------------+
|        Country|WeekNumber|NumInvoices|TotalQuantity|InvoiceValue|      RunningTotal|
+---------------+----------+-----------+-------------+------------+------------------+
|      Australia|        48|          1|          107|      358.25|            358.25|
|      Australia|        49|          1|          214|       258.9|            617.15|
|      Australia|        50|          2|          133|      387.95|1005.0999999999999|
|        Austria|        50|          2|            3|      257.04|            257.04|
|        Bahrain|        51|          1|           54|      205.74|            205.74|
|        Belgium|        48|          1|          528|       346.1|             346.1|
|        Belgium|        50|          2|          285|      625.16|            971.26|
|        Belgium|        51|          2|          942|      838.65|1809.9099999999999|
|Channel Islands|        49|          1|   

# Dataframe Join

In [10]:
spark = SparkSession \
    .builder \
    .master("local[3]") \
    .appName("Join") \
    .getOrCreate()

In [11]:
orders_list = [("01", "02", 350, 1),
               ("01", "04", 580, 1),
               ("01", "07", 320, 2),
               ("02", "03", 450, 1),
               ("02", "06", 220, 1),
               ("03", "01", 195, 1),
               ("04", "09", 270, 3),
               ("04", "08", 410, 2),
               ("05", "02", 350, 1)]

order_df = spark.createDataFrame(orders_list).toDF("order_id", "prod_id", "unit_price", "qty")

product_list = [("01", "Scroll Mouse", 250, 20),
                ("02", "Optical Mouse", 350, 20),
                ("03", "Wireless Mouse", 450, 50),
                ("04", "Wireless Keyboard", 580, 50),
                ("05", "Standard Keyboard", 360, 10),
                ("06", "16 GB Flash Storage", 240, 100),
                ("07", "32 GB Flash Storage", 320, 50),
                ("08", "64 GB Flash Storage", 430, 25)]

product_df = spark.createDataFrame(product_list).toDF("prod_id", "prod_name", "list_price", "qty")


In [17]:
order_df.join(product_df,order_df.prod_id == product_df.prod_id, 'inner') \
    .drop(product_df.qty).show()
    #.select('product_df.order_id','product_df.unit_price') \

+--------+-------+----------+---+-------+-------------------+----------+
|order_id|prod_id|unit_price|qty|prod_id|          prod_name|list_price|
+--------+-------+----------+---+-------+-------------------+----------+
|      03|     01|       195|  1|     01|       Scroll Mouse|       250|
|      01|     02|       350|  1|     02|      Optical Mouse|       350|
|      05|     02|       350|  1|     02|      Optical Mouse|       350|
|      02|     03|       450|  1|     03|     Wireless Mouse|       450|
|      01|     04|       580|  1|     04|  Wireless Keyboard|       580|
|      02|     06|       220|  1|     06|16 GB Flash Storage|       240|
|      01|     07|       320|  2|     07|32 GB Flash Storage|       320|
|      04|     08|       410|  2|     08|64 GB Flash Storage|       430|
+--------+-------+----------+---+-------+-------------------+----------+



In [25]:
#left join

order_df.join(product_df,order_df.prod_id == product_df.prod_id, 'left') \
    .drop('product_df.qty','product_df.prod_id') \
    .withColumn("prod_name",expr("COALESCE(prod_name,order_df.prod_id)"))
    

IndentationError: unexpected indent (<ipython-input-25-709dff170757>, line 5)

# Join Optimization

In [7]:
from pyspark.sql.functions import broadcast
spark = SparkSession \
    .builder \
    .master("local[3]") \
    .appName("Join") \
    .getOrCreate()

In [4]:
f_df1 = spark.read.json("d1")
f_df2 = spark.read.json("d2")

In [5]:
spark.conf.set("spark.sql.shuffle.partitions",3)


In [8]:
#look for brodcast join
#works when one df is small
join_exp = f_df1.id == f_df2.id
join_df = f_df1.join(broadcast(f_df2),join_exp,'inner')

In [9]:
#bucket the dataframe in case of large dataset

f_df1.coalesce(1).write \
    .bucketBy(3,"id").saveAsTable("flight_df1")

In [10]:
f_df2.coalesce(1).write \
    .bucketBy(3,"id").saveAsTable("flight_df2")

In [16]:
db_df1 = spark.read.table("flight_df1")

db_df2 = spark.read.table('flight_df2')

In [18]:
spark.conf.set("spark.sql.autoBroadcastJoinThreshold",-1)


join_exp = db_df1.id == db_df2.id
join_df = db_df1.join(db_df2,join_exp,'inner')

In [None]:
sum_df.show()

In [45]:
temp_df.printSchema()

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: string (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: integer (nullable = true)
 |-- Country: string (nullable = true)

