In [1]:
pip install pyspark

Defaulting to user installation because normal site-packages is not writeable
Note: you may need to restart the kernel to use updated packages.


In [2]:
pip install findspark

Defaulting to user installation because normal site-packages is not writeable
Note: you may need to restart the kernel to use updated packages.


In [3]:
import pyspark 
import findspark
findspark.init()

In [4]:
# Create a Spark session first
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('example').getOrCreate()

24/10/15 11:06:48 WARN Utils: Your hostname, hexagon resolves to a loopback address: 127.0.1.1; using 192.168.99.98 instead (on interface wlx00c0cab05cdd)
24/10/15 11:06:48 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/10/15 11:06:48 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [5]:
spark

In [6]:
orders_df = spark.createDataFrame(
    data = [(100, 1, 1, 50.1, 1, "Thingamjig", 5, "Joe Reis"),
            (100, 2, 2, 25.08, 2, "Whatchamacallit", 5, "Joe Reis"),
            (101, 1, 3, 75.23, 1, "Whoozeewhatzit", 7, "Matt Housley")],
    schema = """ OrderID long, ItemNumber integer, SKU integer, Price double, Quantity integer, Name string, CustomerID long, CustomerName string""")

In [7]:
orders_df.show()

                                                                                

+-------+----------+---+-----+--------+---------------+----------+------------+
|OrderID|ItemNumber|SKU|Price|Quantity|           Name|CustomerID|CustomerName|
+-------+----------+---+-----+--------+---------------+----------+------------+
|    100|         1|  1| 50.1|       1|     Thingamjig|         5|    Joe Reis|
|    100|         2|  2|25.08|       2|Whatchamacallit|         5|    Joe Reis|
|    101|         1|  3|75.23|       1| Whoozeewhatzit|         7|Matt Housley|
+-------+----------+---+-----+--------+---------------+----------+------------+



In [8]:
# now lets import a csv file into a dataframe,  still using the original spark object to do this:
transactions_df = spark.read.csv('online_retail_map.csv', header=True)

In [9]:
transactions_df.show(n=6)

+-------+---------+--------------------+--------+--------------+-----+-----------+--------------+
|Invoice|StockCode|         Description|Quantity|   InvoiceDate|Price|Customer ID|       Country|
+-------+---------+--------------------+--------+--------------+-----+-----------+--------------+
| 489434|    85048|15CM CHRISTMAS GL...|      12|12/1/2009 7:45| 6.95|      13085|United Kingdom|
| 489434|   79323P|  PINK CHERRY LIGHTS|      12|12/1/2009 7:45| 6.75|      13085|United Kingdom|
| 489434|   79323W| WHITE CHERRY LIGHTS|      12|12/1/2009 7:45| 6.75|      13085|United Kingdom|
| 489434|    22041|"RECORD FRAME 7""...|      48|12/1/2009 7:45|  2.1|      13085|United Kingdom|
| 489434|    21232|STRAWBERRY CERAMI...|      24|12/1/2009 7:45| 1.25|      13085|United Kingdom|
| 489434|    22064|PINK DOUGHNUT TRI...|      24|12/1/2009 7:45| 1.65|      13085|United Kingdom|
+-------+---------+--------------------+--------+--------------+-----+-----------+--------------+
only showing top 6 r

In [10]:
# show the columns
transactions_df.columns

['Invoice',
 'StockCode',
 'Description',
 'Quantity',
 'InvoiceDate',
 'Price',
 'Customer ID',
 'Country']

In [11]:
transactions_df.select('Price', 'Quantity', 'Country').show(n=6)

+-----+--------+--------------+
|Price|Quantity|       Country|
+-----+--------+--------------+
| 6.95|      12|United Kingdom|
| 6.75|      12|United Kingdom|
| 6.75|      12|United Kingdom|
|  2.1|      48|United Kingdom|
| 1.25|      24|United Kingdom|
| 1.65|      24|United Kingdom|
+-----+--------+--------------+
only showing top 6 rows



In [12]:
# get summaries of columns
transactions_df.select('Price', 'Quantity', 'Country').describe().show()



+-------+------------------+------------------+-----------+
|summary|             Price|          Quantity|    Country|
+-------+------------------+------------------+-----------+
|  count|           1067371|           1067371|    1067371|
|   mean| 4.649387727415791|   9.9388984711033|       NULL|
| stddev|123.55305872146302|172.70579407675285|       NULL|
|    min|         -11062.06|                -1|  Australia|
|    max|             99.96|               992|West Indies|
+-------+------------------+------------------+-----------+



                                                                                

In [13]:
# Create a new column that is a calculation of two other columns
transactions_df = transactions_df.withColumn(colName='Amount', col= transactions_df.Price * transactions_df.Quantity)

In [14]:
transactions_df.show(n=6)

+-------+---------+--------------------+--------+--------------+-----+-----------+--------------+------------------+
|Invoice|StockCode|         Description|Quantity|   InvoiceDate|Price|Customer ID|       Country|            Amount|
+-------+---------+--------------------+--------+--------------+-----+-----------+--------------+------------------+
| 489434|    85048|15CM CHRISTMAS GL...|      12|12/1/2009 7:45| 6.95|      13085|United Kingdom|              83.4|
| 489434|   79323P|  PINK CHERRY LIGHTS|      12|12/1/2009 7:45| 6.75|      13085|United Kingdom|              81.0|
| 489434|   79323W| WHITE CHERRY LIGHTS|      12|12/1/2009 7:45| 6.75|      13085|United Kingdom|              81.0|
| 489434|    22041|"RECORD FRAME 7""...|      48|12/1/2009 7:45|  2.1|      13085|United Kingdom|100.80000000000001|
| 489434|    21232|STRAWBERRY CERAMI...|      24|12/1/2009 7:45| 1.25|      13085|United Kingdom|              30.0|
| 489434|    22064|PINK DOUGHNUT TRI...|      24|12/1/2009 7:45|

In [15]:
# lets rename an existing column name
transactions_df = transactions_df.withColumnRenamed(existing='Invoice', new='ID')

In [16]:
# lets drop a column
transactions_df = transactions_df.drop('Description')

In [17]:
transactions_df.show(n=6)

+------+---------+--------+--------------+-----+-----------+--------------+------------------+
|    ID|StockCode|Quantity|   InvoiceDate|Price|Customer ID|       Country|            Amount|
+------+---------+--------+--------------+-----+-----------+--------------+------------------+
|489434|    85048|      12|12/1/2009 7:45| 6.95|      13085|United Kingdom|              83.4|
|489434|   79323P|      12|12/1/2009 7:45| 6.75|      13085|United Kingdom|              81.0|
|489434|   79323W|      12|12/1/2009 7:45| 6.75|      13085|United Kingdom|              81.0|
|489434|    22041|      48|12/1/2009 7:45|  2.1|      13085|United Kingdom|100.80000000000001|
|489434|    21232|      24|12/1/2009 7:45| 1.25|      13085|United Kingdom|              30.0|
|489434|    22064|      24|12/1/2009 7:45| 1.65|      13085|United Kingdom|39.599999999999994|
+------+---------+--------+--------------+-----+-----------+--------------+------------------+
only showing top 6 rows



In [18]:
# delete rows with null values
transactions_df = transactions_df.dropna()

In [19]:
# lets filter rows 
transactions_df = transactions_df.filter(transactions_df.Quantity>0)

In [20]:
transactions_df.show(n=6)

+------+---------+--------+--------------+-----+-----------+--------------+------------------+
|    ID|StockCode|Quantity|   InvoiceDate|Price|Customer ID|       Country|            Amount|
+------+---------+--------+--------------+-----+-----------+--------------+------------------+
|489434|    85048|      12|12/1/2009 7:45| 6.95|      13085|United Kingdom|              83.4|
|489434|   79323P|      12|12/1/2009 7:45| 6.75|      13085|United Kingdom|              81.0|
|489434|   79323W|      12|12/1/2009 7:45| 6.75|      13085|United Kingdom|              81.0|
|489434|    22041|      48|12/1/2009 7:45|  2.1|      13085|United Kingdom|100.80000000000001|
|489434|    21232|      24|12/1/2009 7:45| 1.25|      13085|United Kingdom|              30.0|
|489434|    22064|      24|12/1/2009 7:45| 1.65|      13085|United Kingdom|39.599999999999994|
+------+---------+--------+--------------+-----+-----------+--------------+------------------+
only showing top 6 rows



In [21]:
# calculate total amount spent on each order
transactions_df.groupby("ID").sum("Amount").show()

[Stage 12:>                                                         (0 + 8) / 8]

+------+------------------+
|    ID|       sum(Amount)|
+------+------------------+
|489677|             192.0|
|491045|             303.2|
|491658|155.05999999999997|
|493542|            118.75|
|493977|            275.95|
|494244|            6711.0|
|494277|           1335.92|
|495185|           2507.06|
|495783|             48.96|
|496171|199.29999999999998|
|496233|188.82999999999998|
|496427|291.14000000000004|
|497229| 312.5899999999999|
|498070|207.15000000000006|
|498125|190.95000000000005|
|498328|            275.04|
|500148|431.59000000000015|
|500903|           2655.96|
|500979|              11.9|
|501046|             612.0|
+------+------------------+
only showing top 20 rows



                                                                                

In [22]:
# total number of rows for each country in descending order
transactions_df.groupby("Country").count().orderBy('count', ascending=False).show()



+---------------+------+
|        Country| count|
+---------------+------+
| United Kingdom|725296|
|        Germany| 16703|
|           EIRE| 15745|
|         France| 13813|
|    Netherlands|  5093|
|          Spain|  3720|
|        Belgium|  3069|
|    Switzerland|  3012|
|       Portugal|  2446|
|      Australia|  1815|
|Channel Islands|  1569|
|          Italy|  1468|
|         Norway|  1437|
|         Sweden|  1319|
|         Cyprus|  1155|
|        Finland|  1032|
|        Austria|   922|
|        Denmark|   798|
|         Greece|   657|
|    Unspecified|   521|
+---------------+------+
only showing top 20 rows



                                                                                

In [25]:
from pyspark.sql.functions import udf
@udf('string')
def toUpper(word:str):
    return word.upper()

In [26]:


## udf_to_upper = udf(toUpper, returnType='string')
transactions_df.select('ID', toUpper('Country')).show(n=6)

+------+----------------+
|    ID|toUpper(Country)|
+------+----------------+
|489434|  UNITED KINGDOM|
|489434|  UNITED KINGDOM|
|489434|  UNITED KINGDOM|
|489434|  UNITED KINGDOM|
|489434|  UNITED KINGDOM|
|489434|  UNITED KINGDOM|
+------+----------------+
only showing top 6 rows



In [27]:
# Let's replace an existing column in place with new values
transactions_df = transactions_df.withColumn(colName='Country', 
                                             col=toUpper('Country'))

In [28]:
transactions_df.show(n=6)

+------+---------+--------+--------------+-----+-----------+--------------+------------------+
|    ID|StockCode|Quantity|   InvoiceDate|Price|Customer ID|       Country|            Amount|
+------+---------+--------+--------------+-----+-----------+--------------+------------------+
|489434|    85048|      12|12/1/2009 7:45| 6.95|      13085|UNITED KINGDOM|              83.4|
|489434|   79323P|      12|12/1/2009 7:45| 6.75|      13085|UNITED KINGDOM|              81.0|
|489434|   79323W|      12|12/1/2009 7:45| 6.75|      13085|UNITED KINGDOM|              81.0|
|489434|    22041|      48|12/1/2009 7:45|  2.1|      13085|UNITED KINGDOM|100.80000000000001|
|489434|    21232|      24|12/1/2009 7:45| 1.25|      13085|UNITED KINGDOM|              30.0|
|489434|    22064|      24|12/1/2009 7:45| 1.65|      13085|UNITED KINGDOM|39.599999999999994|
+------+---------+--------+--------------+-----+-----------+--------------+------------------+
only showing top 6 rows



In [29]:
transactions_df.createOrReplaceTempView('orders')

In [30]:
total_df = spark.sql("""
SELECT ID, SUM(amount) as total
from orders
GROUP BY ID
ORDER BY total DESC
""")

In [31]:
total_df.show(n=6)



+------+------------------+
|    ID|             total|
+------+------------------+
|581483|          168469.6|
|541431|           77183.6|
|493819|44051.600000000006|
|556444|           38970.0|
|524181|33167.799999999996|
|537659|31770.979999999996|
+------+------------------+
only showing top 6 rows



                                                                                

In [32]:
def toLower(word:str):
    return word.lower()

In [33]:
_ = spark.udf.register("udf_to_lower", toLower)

In [34]:
spark.sql("""
SELECT DISTINCT udf_to_lower(country)
FROM orders
""").show()



+---------------------+
|udf_to_lower(country)|
+---------------------+
|              finland|
|            australia|
|               greece|
|             portugal|
|              nigeria|
|               poland|
|              austria|
|                malta|
|                japan|
|          switzerland|
|               sweden|
|          netherlands|
| united arab emirates|
|                 eire|
|               france|
|          unspecified|
|       united kingdom|
|              germany|
|                  rsa|
|                italy|
+---------------------+
only showing top 20 rows



                                                                                

In [35]:
product_category_df = spark.createDataFrame(
    data = [(22423, 'category_a'), (21212, 'category_b'), 
            (21232, 'category_c'), (84879, 'category_a')],
    schema = 'itemID string, category string')

In [36]:
product_category_df.createTempView('items')

In [38]:
spark.sql("""
SELECT category, AVG(amount)
from items
LEFT JOIN orders ON items.itemID = orders.stockcode
GROUP by category
""").show()



+----------+------------------+
|  category|       avg(amount)|
+----------+------------------+
|category_c|20.994959999999978|
|category_a| 66.62807636539425|
|category_b|16.604122079879243|
+----------+------------------+



                                                                                