In [1]:
from pyspark.sql.functions import count, countDistinct, approx_count_distinct

In [2]:
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder.master("local").appName("read_df").config("spark.some.config.option","some-value").getOrCreate()

In [4]:
df = spark.read.format("csv")\
        .option("header", "true")\
        .option("inferSchema", "true")\
        .load(r"D:\New\Spark-The-Definitive-Guide\data\retail-data\all\online-retail-dataset.csv")

In [5]:
df.cache()

DataFrame[InvoiceNo: string, StockCode: string, Description: string, Quantity: int, InvoiceDate: string, UnitPrice: double, CustomerID: int, Country: string]

In [6]:
df.createOrReplaceTempView("dfTable")

In [7]:
df.select(count("StockCode")).show()

+----------------+
|count(StockCode)|
+----------------+
|          541909|
+----------------+



In [8]:
df.count()

541909

In [9]:
df.select(countDistinct("StockCode")).show()

+-------------------------+
|count(DISTINCT StockCode)|
+-------------------------+
|                     4070|
+-------------------------+



In [10]:
df.select(approx_count_distinct("StockCode",0.1)).show()

+--------------------------------+
|approx_count_distinct(StockCode)|
+--------------------------------+
|                            3364|
+--------------------------------+



In [11]:
from pyspark.sql.functions import first, last

In [12]:
df.select(first("StockCode"), last("StockCode")).show()

+----------------+---------------+
|first(StockCode)|last(StockCode)|
+----------------+---------------+
|          85123A|          22138|
+----------------+---------------+



In [13]:
df.show()

+---------+---------+--------------------+--------+--------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|   InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+--------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|12/1/2010 8:26|     2.55|     17850|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|12/1/2010 8:26|     2.75|     17850|United Kingdom|
|   536365|   84029G|KNITTED UNION FLA...|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|
|   536365|    22752|SET 7 BABUSHKA NE...|       2|12/1/2010 8:26|     7.65|     17850|United Kingdom|
|   536365|    21730|GLASS STAR FROSTE...|       6|12/1/2010 8:26|     4.

In [14]:
from pyspark.sql.functions import min, max

In [15]:
df.select(min("Quantity"), max("Quantity")).show()

+-------------+-------------+
|min(Quantity)|max(Quantity)|
+-------------+-------------+
|       -80995|        80995|
+-------------+-------------+



In [16]:
from pyspark.sql.functions import sumDistinct

In [17]:
sum_distinct = df.select(sumDistinct("Quantity")).show()



+----------------------+
|sum(DISTINCT Quantity)|
+----------------------+
|                 29310|
+----------------------+



In [19]:
from pyspark.sql.functions import var_pop, stddev_pop
from pyspark.sql.functions import var_samp, stddev_samp

In [20]:
df.select(var_pop("Quantity"), stddev_pop("Quantity"),
         var_samp("Quantity"), stddev_pop("Quantity")).show()

+-----------------+--------------------+------------------+--------------------+
|var_pop(Quantity)|stddev_pop(Quantity)|var_samp(Quantity)|stddev_pop(Quantity)|
+-----------------+--------------------+------------------+--------------------+
|47559.30364660879|  218.08095663447733| 47559.39140929848|  218.08095663447733|
+-----------------+--------------------+------------------+--------------------+



In [21]:
"""Although you can calculate average by dividing sum distinct by count.
Pyspark provides a beter way to calculate average"""
from pyspark.sql.functions import count, avg, expr, sum

In [22]:
df.select(sum("Quantity").alias("Total_Purchases"),
         count("Quantity").alias("Total_Transactions"),
         avg("Quantity").alias("Average_Purchases"),
         expr("mean(Quantity)").alias("Mean_Purchases"))\
.selectExpr("Total_Purchases/Total_Transactions",
           "Average_Purchases",
           "Mean_Purchases").show()

+--------------------------------------+-----------------+----------------+
|(Total_Purchases / Total_Transactions)|Average_Purchases|  Mean_Purchases|
+--------------------------------------+-----------------+----------------+
|                      9.55224954743324| 9.55224954743324|9.55224954743324|
+--------------------------------------+-----------------+----------------+



## Skewness and Kurtosis

In [23]:
# Both are measurement of the extreme points in the data
# Skewness measures the asymmetry of the values in your data around the mean
# Kurtosis measures the tail of the data

In [24]:
from pyspark.sql.functions import skewness, kurtosis

In [25]:
df.select(skewness("Quantity"), kurtosis("Quantity")).show()

+------------------+------------------+
|skewness(Quantity)|kurtosis(Quantity)|
+------------------+------------------+
|-0.264075576105298|119768.05495534067|
+------------------+------------------+



## Covariance and Correlation

In [26]:
# Here we are trying to measure the interactions of the values in two different columns
# Covariance: 
# Correlation: Measures the pearsons correlatio coefficient which is scaled between -1 and +1.
# Covariance: This is scaled according to the inputs in the data.

In [27]:
from pyspark.sql.functions import corr, covar_pop, covar_samp

In [28]:
df.select(corr("Quantity", "InvoiceNo"), covar_pop("Quantity","InvoiceNo"), covar_pop("Quantity","InvoiceNo")).show()

+-------------------------+------------------------------+------------------------------+
|corr(Quantity, InvoiceNo)|covar_pop(Quantity, InvoiceNo)|covar_pop(Quantity, InvoiceNo)|
+-------------------------+------------------------------+------------------------------+
|     4.912186085636769E-4|            1052.7260778751527|            1052.7260778751527|
+-------------------------+------------------------------+------------------------------+



## Aggregating to complex types

In [29]:
#Aggregating to Complex Types
from pyspark.sql.functions import collect_set, collect_list

In [30]:
df.select(collect_set("Country"), collect_list("Country")).show()
#collect_list: Gives the values in a column in a list
#collect_set: 

+--------------------+---------------------+
|collect_set(Country)|collect_list(Country)|
+--------------------+---------------------+
|[Portugal, Italy,...| [United Kingdom, ...|
+--------------------+---------------------+



## Grouping
#### Grouping with expressions, grouping with maps, and just groupBy

#### first you have to use the column name whcih you have to groupBy and use the aggregations


In [31]:
df.groupBy("InvoiceNo","CustomerID").count().show(5)

+---------+----------+-----+
|InvoiceNo|CustomerID|count|
+---------+----------+-----+
|   536846|     14573|   76|
|   537026|     12395|   12|
|   537883|     14437|    5|
|   538068|     17978|   12|
|   538279|     14952|    7|
+---------+----------+-----+
only showing top 5 rows



In [32]:
from pyspark.sql.functions import sum, col, desc

In [33]:
df.groupBy("InvoiceNo","CustomerID").count().filter(col("CustomerID").isNotNull()).sort(desc("count")).show()

+---------+----------+-----+
|InvoiceNo|CustomerID|count|
+---------+----------+-----+
|   576339|     14096|  542|
|   579196|     14096|  533|
|   580727|     14096|  529|
|   578270|     14096|  442|
|   573576|     14096|  435|
|   567656|     14096|  421|
|   567183|     14769|  399|
|   575607|     14096|  377|
|   571441|     14096|  364|
|   570488|     14096|  353|
|   572552|     14096|  352|
|   568346|     14096|  335|
|   547063|     14769|  294|
|   569246|     14096|  285|
|   562031|     16984|  277|
|   554098|     14769|  264|
|   570672|     12536|  259|
|   543040|     17337|  259|
|   569897|     17813|  239|
|   572103|     17571|  223|
+---------+----------+-----+
only showing top 20 rows

