In [13]:
orders = (spark
             .read
             .format("jdbc")
             .option("url", "jdbc:mysql://localhost/retail_db")
             .option("driver", "com.mysql.jdbc.Driver")
             .option("dbtable", "orders")
             .option("user", "root")
             .option("password", "cloudera")
             .load())

In [14]:
orders.show(5)

+--------+-------------------+-----------------+---------------+
|order_id|         order_date|order_customer_id|   order_status|
+--------+-------------------+-----------------+---------------+
|       1|2013-07-25 00:00:00|            11599|         CLOSED|
|       2|2013-07-25 00:00:00|              256|PENDING_PAYMENT|
|       3|2013-07-25 00:00:00|            12111|       COMPLETE|
|       4|2013-07-25 00:00:00|             8827|         CLOSED|
|       5|2013-07-25 00:00:00|            11318|       COMPLETE|
+--------+-------------------+-----------------+---------------+
only showing top 5 rows



In [15]:
orders.write.format("parquet").mode("overwrite").save("orders")

In [16]:
orders = spark.read.load("orders")
orders.show(6)

+--------+-------------------+-----------------+---------------+
|order_id|         order_date|order_customer_id|   order_status|
+--------+-------------------+-----------------+---------------+
|       1|2013-07-25 00:00:00|            11599|         CLOSED|
|       2|2013-07-25 00:00:00|              256|PENDING_PAYMENT|
|       3|2013-07-25 00:00:00|            12111|       COMPLETE|
|       4|2013-07-25 00:00:00|             8827|         CLOSED|
|       5|2013-07-25 00:00:00|            11318|       COMPLETE|
|       6|2013-07-25 00:00:00|             7130|       COMPLETE|
+--------+-------------------+-----------------+---------------+
only showing top 6 rows



In [18]:
customers = (spark
             .read
             .format("jdbc")
             .option("url", "jdbc:mysql://localhost/retail_db")
             .option("driver", "com.mysql.jdbc.Driver")
             .option("dbtable", "customers")
             .option("user", "root")
             .option("password", "cloudera")
             .load())
customers.show(5)

+-----------+--------------+--------------+--------------+-----------------+--------------------+-------------+--------------+----------------+
|customer_id|customer_fname|customer_lname|customer_email|customer_password|     customer_street|customer_city|customer_state|customer_zipcode|
+-----------+--------------+--------------+--------------+-----------------+--------------------+-------------+--------------+----------------+
|          1|       Richard|     Hernandez|     XXXXXXXXX|        XXXXXXXXX|  6303 Heather Plaza|  Brownsville|            TX|           78521|
|          2|          Mary|       Barrett|     XXXXXXXXX|        XXXXXXXXX|9526 Noble Embers...|    Littleton|            CO|           80126|
|          3|           Ann|         Smith|     XXXXXXXXX|        XXXXXXXXX|3422 Blue Pioneer...|       Caguas|            PR|           00725|
|          4|          Mary|         Jones|     XXXXXXXXX|        XXXXXXXXX|  8324 Little Common|   San Marcos|            CA|          

In [24]:
from pyspark.sql.functions import *

In [25]:
(orders.alias("t1")
.join(customers.alias("t2"), col("t1.order_customer_id") == col("t2.customer_id"))
.filter("t1.order_status == 'COMPLETE'")
.groupby("order_customer_id", "customer_fname", "customer_lname")
.count()
.orderBy(col("count").desc())
.show(10))

+-----------------+--------------+--------------+-----+
|order_customer_id|customer_fname|customer_lname|count|
+-----------------+--------------+--------------+-----+
|             9337|          Mary|         Smith|   10|
|             3710|        Ashley|         Smith|    9|
|             7802|          Mary|       Acevedo|    9|
|              749|         Jesse|      Matthews|    9|
|            11061|        Joseph|       Webster|    8|
|              221|          Mary|           Cox|    8|
|             5186|         Jason|      Robinson|    8|
|             7910|          Mary|       Camacho|    8|
|             5283|         Jacob|      Guerrero|    8|
|             2469|         Shawn|         Smith|    8|
+-----------------+--------------+--------------+-----+
only showing top 10 rows



In [26]:

orders.write.mode("overwrite").saveAsTable("orders")

In [28]:
sql("show tables").show()

+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
| default|   orders|      false|
+--------+---------+-----------+



In [29]:
spark.table("orders").show(10)

+--------+-------------------+-----------------+---------------+
|order_id|         order_date|order_customer_id|   order_status|
+--------+-------------------+-----------------+---------------+
|       1|2013-07-25 00:00:00|            11599|         CLOSED|
|       2|2013-07-25 00:00:00|              256|PENDING_PAYMENT|
|       3|2013-07-25 00:00:00|            12111|       COMPLETE|
|       4|2013-07-25 00:00:00|             8827|         CLOSED|
|       5|2013-07-25 00:00:00|            11318|       COMPLETE|
|       6|2013-07-25 00:00:00|             7130|       COMPLETE|
|       7|2013-07-25 00:00:00|             4530|       COMPLETE|
|       8|2013-07-25 00:00:00|             2911|     PROCESSING|
|       9|2013-07-25 00:00:00|             5657|PENDING_PAYMENT|
|      10|2013-07-25 00:00:00|             5648|PENDING_PAYMENT|
+--------+-------------------+-----------------+---------------+
only showing top 10 rows



In [30]:

sql("describe formatted orders").toPandas()

Unnamed: 0,col_name,data_type,comment
0,order_id,int,
1,order_date,timestamp,
2,order_customer_id,int,
3,order_status,string,
4,,,
5,# Detailed Table Information,,
6,Database,default,
7,Table,orders,
8,Owner,cloudera,
9,Created Time,Tue Apr 24 01:20:35 PDT 2018,


In [31]:

sql("describe formatted orders").toPandas()

Unnamed: 0,col_name,data_type,comment
0,order_id,int,
1,order_date,timestamp,
2,order_customer_id,int,
3,order_status,string,
4,,,
5,# Detailed Table Information,,
6,Database,default,
7,Table,orders,
8,Owner,cloudera,
9,Created Time,Tue Apr 24 01:20:35 PDT 2018,


In [32]:
sql("drop table orders").show()

++
||
++
++



In [33]:
sfpd = (spark
        .read
        .format("csv")
        .option("header", True)
        .option("inferSchema", True)
        .load("sfpd")
       )

In [34]:
sfpd.printSchema()

root
 |-- IncidntNum: integer (nullable = true)
 |-- Category: string (nullable = true)
 |-- Descript: string (nullable = true)
 |-- DayOfWeek: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Time: string (nullable = true)
 |-- PdDistrict: string (nullable = true)
 |-- Resolution: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- X: double (nullable = true)
 |-- Y: double (nullable = true)
 |-- Location: string (nullable = true)



In [35]:
sfpd.limit(5).toPandas()

Unnamed: 0,IncidntNum,Category,Descript,DayOfWeek,Date,Time,PdDistrict,Resolution,Address,X,Y,Location
0,50436712,ASSAULT,BATTERY,Wednesday,04/20/2005 12:00:00 AM,04:00,MISSION,NONE,18TH ST / CASTRO ST,-122.435003,37.760888,"(37.7608878061245, -122.435002864271)"
1,80049078,LARCENY/THEFT,GRAND THEFT FROM A BUILDING,Sunday,01/13/2008 12:00:00 AM,18:00,PARK,NONE,1100 Block of CLAYTON ST,-122.446838,37.762255,"(37.7622550270122, -122.446837820235)"
2,130366639,ASSAULT,AGGRAVATED ASSAULT WITH A KNIFE,Sunday,05/05/2013 12:00:00 AM,04:10,INGLESIDE,"ARREST, BOOKED",0 Block of SGTJOHNVYOUNG LN,-122.444707,37.724931,"(37.7249307267936, -122.444707063455)"
3,30810835,DRIVING UNDER THE INFLUENCE,DRIVING WHILE UNDER THE INFLUENCE OF ALCOHOL,Tuesday,07/08/2003 12:00:00 AM,01:00,SOUTHERN,"ARREST, BOOKED",MASON ST / TURK ST,-122.408954,37.783288,"(37.7832878735491, -122.408953598286)"
4,130839567,OTHER OFFENSES,TRAFFIC VIOLATION ARREST,Friday,10/04/2013 12:00:00 AM,20:53,TENDERLOIN,"ARREST, BOOKED",TURK ST / LEAVENWORTH ST,-122.414056,37.782793,"(37.7827931071006, -122.414056291891)"
