# PySpark Evaluation

Date:  May 19, 2018

Candidate:  << Raj >>

In [1]:
# fill in with command to read parquet file into a dataframe
parqFileName = 's3://caserta-bucket1/train.pqt'
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark import SparkConf
from pyspark.sql import SparkSession

sc = SparkContext.getOrCreate()
sqlContext = SQLContext(sc)
df = sqlContext.read.parquet(parqFileName)
df.count()


878049

### Get the total count of crimes that happened on a Tuesday

In [60]:
df.where(df.DayOfWeek=='Tuesday').count()

124965

In [5]:
df


DataFrame[Dates: timestamp, Category: string, Descript: string, DayOfWeek: string, PdDistrict: string, Resolution: string, Address: string, X: float, Y: float]

In [26]:
df.printSchema()

root
 |-- Dates: timestamp (nullable = true)
 |-- Category: string (nullable = true)
 |-- Descript: string (nullable = true)
 |-- DayOfWeek: string (nullable = true)
 |-- PdDistrict: string (nullable = true)
 |-- Resolution: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- X: float (nullable = true)
 |-- Y: float (nullable = true)



### Which Police District has the most crime?

In [66]:
df.describe('PdDistrict').show()

+-------+----------+
|summary|PdDistrict|
+-------+----------+
|  count|    878049|
|   mean|      null|
| stddev|      null|
|    min|   BAYVIEW|
|    max|TENDERLOIN|
+-------+----------+



### Find the maximum length of each string column

In [62]:
df.registerTempTable("dft")
sqlContext.sql("select max(length(Category)), max(length(Descript)), max(length(DayOfWeek)), max(length(PdDistrict)), max(length(Resolution)), max(length(Address))  from dft").show()

+---------------------+---------------------+----------------------+-----------------------+-----------------------+--------------------+
|max(length(Category))|max(length(Descript))|max(length(DayOfWeek))|max(length(PdDistrict))|max(length(Resolution))|max(length(Address))|
+---------------------+---------------------+----------------------+-----------------------+-----------------------+--------------------+
|                   27|                   73|                     9|                     10|                     38|                  44|
+---------------------+---------------------+----------------------+-----------------------+-----------------------+--------------------+



### Find the min, max, and average of each column that is a number of any type

In [48]:
sqlContext.sql("select min(X), max(X), avg(X), min(Y), max(Y), avg(Y) from dft").show()

+----------+------+-------------------+--------+------+-----------------+
|    min(X)|max(X)|             avg(X)|  min(Y)|max(Y)|           avg(Y)|
+----------+------+-------------------+--------+------+-----------------+
|-122.51364|-120.5|-122.42261639461971|37.70788|  90.0|37.77102031342325|
+----------+------+-------------------+--------+------+-----------------+



### Create a new dataframe with a new integer column called "addr_words" to have the number of words contained in the Address column.  Display the first 5 rows of Address and addr_words.

In [67]:
import pyspark.sql.functions as f
dfaddrwords = dfaddrwords.withColumn('addr_words', f.size(f.split(f.col('Address'), ' ')))
dfaddrwords.show(50)

+--------------------+--------------+--------------------+---------+----------+--------------+--------------------+-----------+---------+----------+
|               Dates|      Category|            Descript|DayOfWeek|PdDistrict|    Resolution|             Address|          X|        Y|addr_words|
+--------------------+--------------+--------------------+---------+----------+--------------+--------------------+-----------+---------+----------+
|2015-05-14 03:53:...|      WARRANTS|      WARRANT ARREST|Wednesday|  NORTHERN|ARREST, BOOKED|  OAK ST / LAGUNA ST| -122.42589|37.774597|         5|
|2015-05-14 03:53:...|OTHER OFFENSES|TRAFFIC VIOLATION...|Wednesday|  NORTHERN|ARREST, BOOKED|  OAK ST / LAGUNA ST| -122.42589|37.774597|         5|
|2015-05-14 03:33:...|OTHER OFFENSES|TRAFFIC VIOLATION...|Wednesday|  NORTHERN|ARREST, BOOKED|VANNESS AV / GREE...| -122.42436|37.800415|         5|
|2015-05-14 03:30:...| LARCENY/THEFT|GRAND THEFT FROM ...|Wednesday|  NORTHERN|          NONE|1500 Block o

### What is the maximum and minimum number of words in the Address column?

In [81]:
dfaddrwords.select(f.max('addr_words'), f.min('addr_words')).show() 


+---------------+---------------+
|max(addr_words)|min(addr_words)|
+---------------+---------------+
|             10|              3|
+---------------+---------------+



### Rename the column "PdDistrict" to "PoliceDistrict" and print the schema

In [89]:
dfrename = df.selectExpr("Dates", "Category", "Descript", "Address", "PdDistrict as PoliceDistrict").show()


+--------------------+--------------+--------------------+--------------------+--------------+
|               Dates|      Category|            Descript|             Address|PoliceDistrict|
+--------------------+--------------+--------------------+--------------------+--------------+
|2015-05-14 03:53:...|      WARRANTS|      WARRANT ARREST|  OAK ST / LAGUNA ST|      NORTHERN|
|2015-05-14 03:53:...|OTHER OFFENSES|TRAFFIC VIOLATION...|  OAK ST / LAGUNA ST|      NORTHERN|
|2015-05-14 03:33:...|OTHER OFFENSES|TRAFFIC VIOLATION...|VANNESS AV / GREE...|      NORTHERN|
|2015-05-14 03:30:...| LARCENY/THEFT|GRAND THEFT FROM ...|1500 Block of LOM...|      NORTHERN|
|2015-05-14 03:30:...| LARCENY/THEFT|GRAND THEFT FROM ...|100 Block of BROD...|          PARK|
|2015-05-14 03:30:...| LARCENY/THEFT|GRAND THEFT FROM ...| 0 Block of TEDDY AV|     INGLESIDE|
|2015-05-14 03:30:...| VEHICLE THEFT|   STOLEN AUTOMOBILE| AVALON AV / PERU AV|     INGLESIDE|
|2015-05-14 03:30:...| VEHICLE THEFT|   STOLEN AUT

### Consider the following code snippet ...

In [92]:
gameFileLoc = "s3://caserta-bucket1/Seahawks/game.csv"

gameDF = spark.read.format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat") \
            .option("path",gameFileLoc) \
            .option("sep","\t") \
            .option("header","true") \
            .option("inferSchema","true") \
            .option("mode", "FAILFAST") \
            .load(gameFileLoc)


### There is a problem with a column in the resulting dataframe.  Determine what the problem is and resolve the issue.

In [94]:
gameFileLoc = "s3://caserta-bucket1/Seahawks/game.csv"

In [95]:
gameDF = spark.read.format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat") \
            .option("path",gameFileLoc) \
            .option("sep","\t") \
            .option("header","true") \
            .option("inferSchema","true") \
            .option("mode", "FAILFAST") \
            .load(gameFileLoc)

In [97]:
gameDF.count()


2689

In [98]:
gameDF.show()


+-------+------+------------+---------------+---------+------------------+---------+----+---------+--------+----------------+----------------+-------+--------+-------------+--------------+---------+------------+--------------------+
|GameKey|PlayID|HomeClubCode|VisitorClubCode| GameDate|UniversalTimeClock|ClockTime|Down|YardsToGo|YardLine|AbsoluteYardLine|SpecialTeamsPlay|Quarter|Sequence|IsScoringPlay|PossessionTeam|HomeScore|VisitorScore|     PlayDescription|
+-------+------+------------+---------------+---------+------------------+---------+----+---------+--------+----------------+----------------+-------+--------+-------------+--------------+---------+------------+--------------------+
|  55774|    53|         SEA|            TEN|8/11/2012|           2:05:58|    15:00|   0|        0|  SEA 35|              65|               1|      1|      53|            0|           SEA|        0|           0|C.Wiggs kicks 65 ...|
|  55774|    71|         SEA|            TEN|8/11/2012|           2:

In [99]:
gameDF.printSchema()

root
 |-- GameKey: integer (nullable = true)
 |-- PlayID: integer (nullable = true)
 |-- HomeClubCode: string (nullable = true)
 |-- VisitorClubCode: string (nullable = true)
 |-- GameDate: string (nullable = true)
 |-- UniversalTimeClock: string (nullable = true)
 |-- ClockTime: string (nullable = true)
 |-- Down: integer (nullable = true)
 |-- YardsToGo: integer (nullable = true)
 |-- YardLine: string (nullable = true)
 |-- AbsoluteYardLine: integer (nullable = true)
 |-- SpecialTeamsPlay: integer (nullable = true)
 |-- Quarter: integer (nullable = true)
 |-- Sequence: integer (nullable = true)
 |-- IsScoringPlay: integer (nullable = true)
 |-- PossessionTeam: string (nullable = true)
 |-- HomeScore: integer (nullable = true)
 |-- VisitorScore: integer (nullable = true)
 |-- PlayDescription: string (nullable = true)

