In [4]:
from pyspark.sql import SparkSession

In [5]:
sess = SparkSession \
        .builder \
        .appName('Predict grape quality from wine properties') \
        .getOrCreate()

In [6]:
sess

In [35]:
# If a hive db related exception raises - delete the db.lck file from C:\tools\spark2\bin\code\metastore_db
salesDF = sess.read \
            .format('csv') \
            .option('header', 'true') \
            .load('dataset/sales.csv')

In [37]:
type(salesDF)

pyspark.sql.dataframe.DataFrame

In [62]:
salesDF.dtypes

[('InvoiceNo', 'string'),
 ('StockCode', 'string'),
 ('Description', 'string'),
 ('Quantity', 'string'),
 ('InvoiceDate', 'string'),
 ('UnitPrice', 'string'),
 ('CustomerID', 'string'),
 ('Country', 'string')]

### Dataframe actions
##### collect(); count(); first(); head(); show(); take();
###### http://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html

### DataFrame select

In [55]:
salesDF.show(5)

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|2010-12-01 08:26:00|     2.55|   17850.0|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|2010-12-01 08:26:00|     2.75|   17850.0|United Kingdom|
|   536365|   84029G|KNITTED UNION FLA...|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
only showing top 5 rows



In [39]:
salesDF.select("Country", "InvoiceDate").show(5)

+--------------+-------------------+
|       Country|        InvoiceDate|
+--------------+-------------------+
|United Kingdom|2010-12-01 08:26:00|
|United Kingdom|2010-12-01 08:26:00|
|United Kingdom|2010-12-01 08:26:00|
|United Kingdom|2010-12-01 08:26:00|
|United Kingdom|2010-12-01 08:26:00|
+--------------+-------------------+
only showing top 5 rows



In [49]:
salesDF.select("Country").distinct().count()

7

In [43]:
type(salesDF.UnitPrice)

pyspark.sql.column.Column

### DataFrame filter

In [66]:
salesDF.filter(salesDF.UnitPrice > 100).show(5)

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|   536392|    22827|RUSTIC  SEVENTEEN...|       1|2010-12-01 10:29:00|    165.0|   13705.0|United Kingdom|
|   536544|      DOT|      DOTCOM POSTAGE|       1|2010-12-01 14:32:00|   569.77|      null|United Kingdom|
|   536592|      DOT|      DOTCOM POSTAGE|       1|2010-12-01 17:06:00|   607.49|      null|United Kingdom|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+



### Column type cast

In [131]:
#from pyspark.sql.types import StringType
from pyspark.sql.types import FloatType
priceDF = salesDF.select(salesDF.UnitPrice.cast(FloatType()).alias('UnitPriceFloat'))

# Convert to date type
from pyspark.sql.functions import to_date
dateDF = salesDF.select(to_date(salesDF.InvoiceDate).alias('Date'))

In [98]:
print(type(priceDF))
print(priceDF.dtypes)
print(type(dateDF))
print(dateDF.dtypes)

<class 'pyspark.sql.dataframe.DataFrame'>
[('UnitPriceFloat', 'float')]
<class 'pyspark.sql.dataframe.DataFrame'>
[('Date', 'date')]


### Add new column

In [104]:
salesDF1 = salesDF.withColumn('UnitPriceFloat', salesDF.UnitPrice.cast(FloatType()))
salesDF2 = salesDF1.withColumn('DateOnly', to_date(salesDF1.InvoiceDate))

In [105]:
salesDF2.dtypes

[('InvoiceNo', 'string'),
 ('StockCode', 'string'),
 ('Description', 'string'),
 ('Quantity', 'string'),
 ('InvoiceDate', 'string'),
 ('UnitPrice', 'string'),
 ('CustomerID', 'string'),
 ('Country', 'string'),
 ('UnitPriceFloat', 'float'),
 ('DateOnly', 'date')]

### Using UDF

In [132]:
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf

# UDF: Turning python function to pySpark function 
# int()  is python  cast
# cast() is pySpark cast
fCast = udf(lambda s: int(s), IntegerType())
salesDF3 = salesDF2.withColumn('QuantityInt', fCast(salesDF2.Quantity))
salesDF3.show(2)

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+--------------+----------+-----------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|UnitPriceFloat|  DateOnly|QuantityInt|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+--------------+----------+-----------+
|   536365|   85123A|WHITE HANGING HEA...|       6|2010-12-01 08:26:00|     2.55|   17850.0|United Kingdom|          2.55|2010-12-01|          6|
|   536365|    71053| WHITE METAL LANTERN|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|          3.39|2010-12-01|          6|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+--------------+----------+-----------+
only showing top 2 rows



In [125]:
salesDF3.dtypes

[('InvoiceNo', 'string'),
 ('StockCode', 'string'),
 ('Description', 'string'),
 ('Quantity', 'string'),
 ('InvoiceDate', 'string'),
 ('UnitPrice', 'string'),
 ('CustomerID', 'string'),
 ('Country', 'string'),
 ('UnitPriceFloat', 'float'),
 ('DateOnly', 'date'),
 ('QuantityInt', 'int')]

### DataFrame Groupby


In [133]:
salesDF3.groupBy('Country').sum('UnitPriceFloat').show(5)

+---------+-------------------+
|  Country|sum(UnitPriceFloat)|
+---------+-------------------+
|  Germany|  93.81999984383583|
|   France|  55.29000025987625|
|     EIRE|   133.639998793602|
|   Norway| 102.66999903321266|
|Australia|  73.89999914169312|
+---------+-------------------+
only showing top 5 rows



In [134]:
salesDF3.groupBy('Country').agg({"UnitPriceFloat":"sum", "QuantityInt":"mean"}).show(5)

+---------+-------------------+------------------+
|  Country|sum(UnitPriceFloat)|  avg(QuantityInt)|
+---------+-------------------+------------------+
|  Germany|  93.81999984383583|4.0344827586206895|
|   France|  55.29000025987625|             22.45|
|     EIRE|   133.639998793602|11.571428571428571|
|   Norway| 102.66999903321266| 25.36986301369863|
|Australia|  73.89999914169312| 7.642857142857143|
+---------+-------------------+------------------+
only showing top 5 rows



In [4]:
0.2**2 + 0.8**2 + 0.7**2 + 0.4**2 + 1.3**2

3.0200000000000005