In [12]:
import findspark
findspark.init('/usr/local/spark')
import pyspark
from pyspark.sql import SparkSession

In [13]:
spark=SparkSession.builder.appName("Python Spark SQL Example").getOrCreate()

In [14]:
customerDF=spark.read.load('/home/hduser/Downloads/customers.txt',format="csv",sep="\t",inferSchema="true",header="true")

In [15]:
customerDF.printSchema()

root
 |-- customer_id: integer (nullable = true)
 |-- customer_name: string (nullable = true)
 |-- customer_city: string (nullable = true)
 |-- customer_state: string (nullable = true)
 |-- customer_zipcode: integer (nullable = true)



In [17]:
customerDF.select(customerDF["customer_name"]).show()

+----------------+
|   customer_name|
+----------------+
|     Mary Torres|
|      Jose Haley|
|      Mary Smith|
|  Richard Maddox|
|  Margaret Booth|
|  Mary Henderson|
|     Lisa Walker|
|   Jonathan Hill|
|Carolyn Sheppard|
|    Mary Mendoza|
|   Michael Smith|
|    James Holmes|
|     Mary Dawson|
|    Adam Marquez|
|    Gloria Smith|
|       Mary Webb|
|  Nancy Alvarado|
|  Russell Flores|
|    Denise Smith|
|  Jose Dickerson|
+----------------+
only showing top 20 rows



In [21]:
customerDF.select(customerDF['customer_name'],customerDF['customer_city']).show()

+----------------+-------------+
|   customer_name|customer_city|
+----------------+-------------+
|     Mary Torres|       Caguas|
|      Jose Haley|     Columbus|
|      Mary Smith|      Houston|
|  Richard Maddox|       Caguas|
|  Margaret Booth|    Arlington|
|  Mary Henderson|       Caguas|
|     Lisa Walker|       Caguas|
|   Jonathan Hill|      Phoenix|
|Carolyn Sheppard|Pompano Beach|
|    Mary Mendoza|       Caguas|
|   Michael Smith|       Caguas|
|    James Holmes|     Hilliard|
|     Mary Dawson|       Caguas|
|    Adam Marquez|  San Antonio|
|    Gloria Smith|       Caguas|
|       Mary Webb|   San Marcos|
|  Nancy Alvarado|     Flushing|
|  Russell Flores|       Caguas|
|    Denise Smith|    Rego Park|
|  Jose Dickerson|         Mesa|
+----------------+-------------+
only showing top 20 rows



In [23]:
customerDF.filter(customerDF['customer_state']=='CA').show()

+-----------+----------------+---------------+--------------+----------------+
|customer_id|   customer_name|  customer_city|customer_state|customer_zipcode|
+-----------+----------------+---------------+--------------+----------------+
|       5577|      Mary Smith|        Modesto|            CA|           95350|
|       1745|      Mary Smith|Rowland Heights|            CA|           91748|
|      11444|Kathleen Patrick|      San Diego|            CA|           92109|
|       8846|    Thomas Smith|          Indio|            CA|           92201|
|       6237|  Bobby Anderson|       El Cajon|            CA|           92020|
|       4085|       Mary Carr|  Panorama City|            CA|           91402|
|       8705|  Patricia Smith|       Stockton|            CA|           95207|
|       3669|       Mary Soto| San Bernardino|            CA|           92410|
|       6101|      Mary Smith|    Los Angeles|            CA|           90033|
|      11697|  Jessica Thomas|  Laguna Niguel|      

In [24]:
customerDF.groupby('customer_state').count().show()

+--------------+-----+
|customer_state|count|
+--------------+-----+
|            AZ|   19|
|            SC|    2|
|            LA|    7|
|            MN|    1|
|            NJ|   19|
|            DC|    4|
|            OR|    4|
|            VA|   14|
|            RI|    2|
|            KY|    1|
|            MI|   28|
|            NV|   16|
|            WI|    9|
|            ID|    2|
|            CA|  187|
|            CT|    8|
|            NC|   19|
|            MD|   19|
|            DE|    1|
|            MO|   13|
+--------------+-----+
only showing top 20 rows



In [32]:
a=customerDF.groupby('customer_state').count()
a.show()
b=a.filter(a['count']>50)
b.show()

+--------------+-----+
|customer_state|count|
+--------------+-----+
|            AZ|   19|
|            SC|    2|
|            LA|    7|
|            MN|    1|
|            NJ|   19|
|            DC|    4|
|            OR|    4|
|            VA|   14|
|            RI|    2|
|            KY|    1|
|            MI|   28|
|            NV|   16|
|            WI|    9|
|            ID|    2|
|            CA|  187|
|            CT|    8|
|            NC|   19|
|            MD|   19|
|            DE|    1|
|            MO|   13|
+--------------+-----+
only showing top 20 rows

+--------------+-----+
|customer_state|count|
+--------------+-----+
|            CA|  187|
|            NY|   79|
|            TX|   62|
|            PR|  505|
+--------------+-----+



# Creating a temporary view for running SQL Queries

In [27]:
customerDF.createOrReplaceTempView("customers")

In [36]:
cStateCount50=spark.sql("Select customer_state, count(*) as state_count from customers group by customer_state having state_count>=50")
cStateCount50.show()

+--------------+-----------+
|customer_state|state_count|
+--------------+-----------+
|            CA|        187|
|            NY|         79|
|            TX|         62|
|            PR|        505|
+--------------+-----------+



In [35]:
type(cStateCount50)

pyspark.sql.dataframe.DataFrame

In [37]:
cStateCount50.printSchema()

root
 |-- customer_state: string (nullable = true)
 |-- state_count: long (nullable = false)



In [38]:
cStateCount50.coalesce(1).write.save("cStateCount50.parquet")

# cStateCount50.coalesce(1).write.save("cStateCount50",format='parquet')
#cStateCount50.coalesce(1).write.parquet("cStateCount50.parquet")

In [39]:
cStateCount50.coalesce(1).write.json("cStateCount50.json")

# loading Product file in json format

In [40]:
productDF=spark.read.load("/home/hduser/Downloads/products.json",format="json")

In [41]:
productDF.printSchema()

root
 |-- category_id: long (nullable = true)
 |-- customer_id: long (nullable = true)
 |-- product_category: string (nullable = true)
 |-- product_name: string (nullable = true)
 |-- product_price: double (nullable = true)
 |-- product_quantity: long (nullable = true)
 |-- salestxn_id: long (nullable = true)



In [44]:
yelpDF=spark.read.load("/home/hduser/Downloads/sharedfolder/yelp_dataset",format="json")

In [43]:
yelpDF.printSchema()

root
 |-- _corrupt_record: string (nullable = true)
 |-- address: string (nullable = true)
 |-- attributes: struct (nullable = true)
 |    |-- AcceptsInsurance: string (nullable = true)
 |    |-- AgesAllowed: string (nullable = true)
 |    |-- Alcohol: string (nullable = true)
 |    |-- Ambience: string (nullable = true)
 |    |-- BYOB: string (nullable = true)
 |    |-- BYOBCorkage: string (nullable = true)
 |    |-- BestNights: string (nullable = true)
 |    |-- BikeParking: string (nullable = true)
 |    |-- BusinessAcceptsBitcoin: string (nullable = true)
 |    |-- BusinessAcceptsCreditCards: string (nullable = true)
 |    |-- BusinessParking: string (nullable = true)
 |    |-- ByAppointmentOnly: string (nullable = true)
 |    |-- Caters: string (nullable = true)
 |    |-- CoatCheck: string (nullable = true)
 |    |-- Corkage: string (nullable = true)
 |    |-- DietaryRestrictions: string (nullable = true)
 |    |-- DogsAllowed: string (nullable = true)
 |    |-- DriveThru: string 

In [46]:
yelpDF.createOrReplaceTempView("yelp")

In [52]:
yelp_data=spark.sql("select average_stars from yelp limit 20")
yelp_data.show()

+-------------+
|average_stars|
+-------------+
|         null|
|         null|
|         null|
|         null|
|         null|
|         null|
|         null|
|         null|
|         null|
|         null|
|         null|
|         null|
|         null|
|         null|
|         null|
|         null|
|         null|
|         null|
|         null|
|         null|
+-------------+



In [53]:
productDF.select(productDF["product_name"]).show()

+--------------------+
|        product_name|
+--------------------+
|O'Brien Men's Neo...|
|O'Brien Men's Neo...|
|Under Armour Wome...|
|O'Brien Men's Neo...|
|Pelican Sunstream...|
|Nike Men's CJ Eli...|
|Diamondback Women...|
|Field & Stream Sp...|
|Perfect Fitness P...|
|Nike Men's CJ Eli...|
|Pelican Sunstream...|
|Nike Men's CJ Eli...|
|Diamondback Women...|
|Nike Men's CJ Eli...|
|Nike Men's Dri-FI...|
|O'Brien Men's Neo...|
|O'Brien Men's Neo...|
|Nike Men's Dri-FI...|
|Diamondback Women...|
|Under Armour Girl...|
+--------------------+
only showing top 20 rows



In [54]:
productDF.printSchema()

root
 |-- category_id: long (nullable = true)
 |-- customer_id: long (nullable = true)
 |-- product_category: string (nullable = true)
 |-- product_name: string (nullable = true)
 |-- product_price: double (nullable = true)
 |-- product_quantity: long (nullable = true)
 |-- salestxn_id: long (nullable = true)



In [57]:
productDF.select(productDf['product_name'],productDF['product_category'],productDF['product_price']).show()

NameError: name 'productDf' is not defined

In [58]:
productDF.filter(productDF["product_price"]>=200).show()

+-----------+-----------+------------------+--------------------+-------------+----------------+-----------+
|category_id|customer_id|  product_category|        product_name|product_price|product_quantity|salestxn_id|
+-----------+-----------+------------------+--------------------+-------------+----------------+-----------+
|         42|        702| Bike & Skate Shop|Diamondback Women...|       299.98|               1|     140220|
|         44|       3959|Hunting & Shooting|Field & Stream Sp...|       399.98|               1|      77426|
|         42|       5658| Bike & Skate Shop|Diamondback Women...|       299.98|               1|      84894|
|         42|       9356| Bike & Skate Shop|Diamondback Women...|       299.98|               1|     102807|
|         42|       8651| Bike & Skate Shop|Diamondback Women...|       299.98|               1|     134324|
|         44|      12072|Hunting & Shooting|Field & Stream Sp...|       399.98|               1|      27476|
|         42|      

In [60]:
productDF.groupby("product_category").count().show()

+-------------------+-----+
|   product_category|count|
+-------------------+-----+
|  Training by Sport|    5|
|   Men's Golf Clubs|   21|
|   Camping & Hiking|   44|
|Fitness Accessories|   47|
|         Golf Shoes|    6|
|         Basketball|   36|
|        Electronics|   48|
|          Team Shop|  162|
|      Men's Apparel| 2085|
|  Bike & Skate Shop| 1377|
|  Golf Bags & Carts|   89|
|    As Seen on  TV!| 2399|
|       Boxing & MMA|  115|
| Hunting & Shooting| 1785|
|Baseball & Softball|    4|
|       Golf Apparel|   51|
| Women's Golf Clubs|   57|
|      Shop By Sport|   26|
|            Fishing| 1953|
|        Accessories|  110|
+-------------------+-----+
only showing top 20 rows



# Create temp view for running sql queries 

In [72]:
productDF.createOrReplaceTempView("products")
#productDF.printSchema()

In [77]:
prd=spark.sql("select * from products")
prd.show()

+-----------+-----------+------------------+--------------------+-------------+----------------+-----------+
|category_id|customer_id|  product_category|        product_name|product_price|product_quantity|salestxn_id|
+-----------+-----------+------------------+--------------------+-------------+----------------+-----------+
|         45|      12080|           Fishing|O'Brien Men's Neo...|        49.98|               2|      98660|
|         45|      12279|           Fishing|O'Brien Men's Neo...|        49.98|               2|      97749|
|         12|       3343|      Boxing & MMA|Under Armour Wome...|        31.99|               4|     103889|
|         45|      12382|           Fishing|O'Brien Men's Neo...|        49.98|               5|      22751|
|         47|       5032|           Boating|Pelican Sunstream...|       199.99|               1|     129001|
|         17|       2663|            Cleats|Nike Men's CJ Eli...|       129.99|               1|     165110|
|         42|      

In [78]:
prd200=spark.sql("select category_id , product_category, count(*) as prdcount from products where product_price>200 group by category_id,product_category order by product_category")

In [79]:
prd200.printSchema()

root
 |-- category_id: long (nullable = true)
 |-- product_category: string (nullable = true)
 |-- prdcount: long (nullable = false)



In [80]:
prd200.show()

+-----------+-------------------+--------+
|category_id|   product_category|prdcount|
+-----------+-------------------+--------+
|         40|        Accessories|       7|
|         16|    As Seen on  TV!|       6|
|          3|Baseball & Softball|       4|
|         42|  Bike & Skate Shop|    1377|
|         47|            Boating|       6|
|          9|   Cardio Equipment|       3|
|         37|        Electronics|       9|
|         34|  Golf Bags & Carts|      10|
|         44| Hunting & Shooting|    1785|
|         30|   Men's Golf Clubs|       5|
|         10|  Strength Training|       2|
|          6|   Tennis & Racquet|       3|
+-----------+-------------------+--------+



In [81]:
prd200.coalesce(1).write.save("prd200.parquet")

In [82]:
prd.show()

+-----------+-----------+------------------+--------------------+-------------+----------------+-----------+
|category_id|customer_id|  product_category|        product_name|product_price|product_quantity|salestxn_id|
+-----------+-----------+------------------+--------------------+-------------+----------------+-----------+
|         45|      12080|           Fishing|O'Brien Men's Neo...|        49.98|               2|      98660|
|         45|      12279|           Fishing|O'Brien Men's Neo...|        49.98|               2|      97749|
|         12|       3343|      Boxing & MMA|Under Armour Wome...|        31.99|               4|     103889|
|         45|      12382|           Fishing|O'Brien Men's Neo...|        49.98|               5|      22751|
|         47|       5032|           Boating|Pelican Sunstream...|       199.99|               1|     129001|
|         17|       2663|            Cleats|Nike Men's CJ Eli...|       129.99|               1|     165110|
|         42|      

In [84]:
cust=spark.sql("select * from customers")
cust.show()

+-----------+----------------+-------------+--------------+----------------+
|customer_id|   customer_name|customer_city|customer_state|customer_zipcode|
+-----------+----------------+-------------+--------------+----------------+
|      11039|     Mary Torres|       Caguas|            PR|             725|
|       5623|      Jose Haley|     Columbus|            OH|           43207|
|       5829|      Mary Smith|      Houston|            TX|           77015|
|       6336|  Richard Maddox|       Caguas|            PR|             725|
|       1708|  Margaret Booth|    Arlington|            TX|           76010|
|      10227|  Mary Henderson|       Caguas|            PR|             725|
|        839|     Lisa Walker|       Caguas|            PR|             725|
|       7604|   Jonathan Hill|      Phoenix|            AZ|           85040|
|       6485|Carolyn Sheppard|Pompano Beach|            FL|           33063|
|       4737|    Mary Mendoza|       Caguas|            PR|             725|

In [88]:
custlist=spark.sql("select c.customer_name,p.product_category, count(*) as prdcount from customers c inner join products p on c.customer_id=p.customer_id where product_price>200 group by c.customer_name,p.product_category having prdcount>2")

In [89]:
custlist.printSchema()

root
 |-- customer_name: string (nullable = true)
 |-- product_category: string (nullable = true)
 |-- prdcount: long (nullable = false)



In [90]:
custlist.show()

+-------------+------------------+--------+
|customer_name|  product_category|prdcount|
+-------------+------------------+--------+
|William Smith|Hunting & Shooting|       3|
|   Mary Smith| Bike & Skate Shop|      22|
|   Mary Smith|Hunting & Shooting|      24|
+-------------+------------------+--------+

