# Spark SQL

HW 5, Labs 5 and 6

#### Inspect the Data

In [2]:
# Get the files
! wget http://idsdl.csom.umn.edu/c/share/sparkdata.zip
! unzip sparkdata.zip

# Files in the directory
!ls loudacre

# First lines of relevant files
!head -2 loudacre/webpage.json    # devices
!head -2 loudacre/device.json     # webpages
!head -2 loudacre/websitehit.json # hits

accountdevice.tsv  calllog.tsv		   device.tsv	      webpage.tsv
accounts.tsv	   customerservicerep.tsv  knowledgebase.tsv  websitehit.json
basestations.tsv   device.json		   webpage.json       websitehit.tsv


---

# Dataframe

### From the file directly

In [None]:
# Infer schema
bids = spark.read.option("inferSchema","true").csv(data_file)
bids = bids.toDF(*cols)

df = spark.read.json("file:/databricks/driver/yelp.json")
df.printSchema()
df.take(5)


data = spark.read.option("header", "true") \
.option("delimiter", "\\t") \
.csv("/databricks-...") \
.cache()

In [None]:
# Col names to append
schema_str = """
    auctionid long, bid double, bidtime double, bidder string,
    bidrate long, openbid double, price double,  
    itemtype string, dtl long
"""

# Re-read the file
bids = spark.read.schema(schema_str).format("csv").load("sparkdata/auctiondata.csv")

In [None]:
#StructType

from pyspark.sql.types import *

#define a structtype as schema
schema_structtype = StructType([
 StructField("auctionid",LongType(),True),
 StructField("bid",DoubleType(),True),
 StructField("bidtime",DoubleType(),True),
 StructField("bidder",StringType(),True),
 StructField("bidrate",LongType(),True),
 StructField("openbid",DoubleType(),True),
 StructField("price",DoubleType(),True),
 StructField("itemtype",StringType(),True),
 StructField("dtl",LongType(),True)
])
 
bids = spark.read.schema(schema_structtype).csv("sparkdata/auctiondata.csv")

### From RDD

In [None]:
data_file = "sparkdata/auctiondata.csv"
rawDataRDD = sc.textFile(data_file).cache()


from pyspark.sql import Row
csvRDD = rawDataRDD.map(lambda l: l.split(","))

# convert
rowRDD = csvRDD.map(lambda p: Row(
    auctionid=p[0], 
    bid=float(p[1]),
    bidtime=float(p[2]),
    bidder=p[3],
    bidrate=int(p[4]),
    openbid=float(p[5]),
    price=float(p[6]),
    itemtype=p[7],
    dtl=int(p[8]),
    )
)

bids = spark.createDataFrame(rowRDD)

---

## Working with dataframe

#### Filter, select, sort, show

In [None]:
bids.filter(bids.auctionid==1645914432) \
    .select("bid","bidder","bidtime").sort(bids.bidtime.desc()).show()

#### View with sql

In [None]:
bids.createOrReplaceTempView("bids")
itemtypes = spark.sql("""
    SELECT itemtype, max(price) as max_price, count(*) as num_bids 
    from bids 
    group by itemtype
""")

#### Pandas agg function instead

In [None]:
import pyspark.sql.functions as f
bids.groupBy("itemtype").agg(f.max(bids.price).alias("max_price"),f.count(bids.price).alias("num_bids")).show()

maxprices = bids.select("auctionid","itemtype","price") \
    .filter(bids.itemtype=='cartier') \
    .groupBy("auctionid") \
    .max("price") \
    .withColumnRenamed("max(price)","max_price")


# Unique values
df.select('column').distinct.take()

# Getting ready to value_counts
df.select('col_id', 
          (f.explode(df.column)).alias('alias_name'))

# Group by and count
df.groupBy(df.col)
    .agg(f.count(df.id)
        .alias('alias'))
    .sort('other_col', ascending=False)
    .take(5)

#### Writing to csv

In [None]:
# Write dataframe into file
maxprices.write.csv("maxprices")

# Verify - there are multiple files, parallel processing (each partition of your data may write its own output)
!ls -l maxprices/

# Take all data into one file
! cat maxprices/* > maxprices.csv

# Head of the file
! head maxprices.csv

---

# JSON

#### Read the file

In [9]:
webpage = spark.read.json("loudacre/webpage.json")

#### How does it look? (using .show, converting to pd)

In [None]:
webpage.printSchema()

# Print 3 lines
webpage.show(3, truncate=False)

# Convert to local, then to pandas, print 3 lines
webpage.limit(3).toPandas()

#### First field has a list of values. Create a new df to explode them so that each element gets its own line + web_page_num

In [79]:
import pyspark.sql.functions as f
page_files = webpage.select("web_page_num", \
             f.explode(f.split(webpage.associated_files,",")).alias("assoc_file"))

# Verify
page_files.show()

#### Inner join this new df with the original

In [82]:
webpage_files = webpage.join(page_files, "web_page_num") \
    .select("web_page_num","web_page_file_name",page_files.assoc_file)
    
# Verify
webpage_files.show()

## Working with json

#### for each webpage, what are the top 2 devices used for visiting this page? 

window functions with `rank()`  
[DataBricks WindowFunc](https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html).

In [85]:
hits = spark.read.json("sparkdata/loudacre/websitehit.json")
devices = spark.read.json("sparkdata/loudacre/device.json")
hits.show(3)
devices.show(3)

#### View - Hits after grouping by page and device

In [127]:
# Replacing the file with a view so we can use sql commands, operates in place
hits.createOrReplaceTempView("hits")

stat = spark.sql("""
    select web_page_id, device_id, count(*) as hits 
    from hits 
    group by web_page_id, device_id 
    order by web_page_id, count(*) desc
""").cache().show(5)

#### Window function - create partitions by web page

In [129]:
from pyspark.sql.window import Window
wind = Window.partitionBy(stat.web_page_id).orderBy(stat.hits.desc())

In [136]:
# .over(wind) applies the function (order by hits) for every partition
top2 = stat.select("web_page_id","device_id","hits", \
    f.dense_rank().over(wind).alias("r")).where("r<3")

#### Joining the tables to select the correct col names

In [138]:
top2_revised = top2.join(devices, top2.device_id == devices.device_num) \
    .join(webpage, top2.web_page_id == webpage.web_page_num) \
    .select(webpage.web_page_file_name, devices.device_name, top2.hits)
    
# Renaming 
top2_revised.show(truncate=False)