In [1]:
%fs ls

path,name,size
dbfs:/FileStore/,FileStore/,0
dbfs:/databricks-datasets/,databricks-datasets/,0
dbfs:/databricks-results/,databricks-results/,0
dbfs:/tmp/,tmp/,0


In [2]:
dbutils.fs.ls("dbfs:/FileStore/aws")

In [3]:
import json
file_location = '/dbfs/FileStore/aws/aws_cred.json'
with open(file_location, 'r') as json_data_file:
  config = json.load(json_data_file)

In [4]:
S3_RESOURCE = 's3'
SCHEME = 's3a'

### BEGIN STRIP ###
BUCKET_NAME = 'lead-us-car'
PREFIX = ''
### END STRIP ###

INPUT_FILENAME = 'US_Accidents_June20.csv'

In [5]:
ACCESS_KEY="_"
SECRET_KEY="_"

### Add your credentials to Spark 
sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", ACCESS_KEY)
sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", SECRET_KEY)

In [6]:
# This is just a utility function
def get_s3_path(key, bucket_name=BUCKET_NAME, scheme=SCHEME):
    return f"{scheme}://{bucket_name}/{key}"

In [7]:
filepath = get_s3_path(f'{PREFIX}/{INPUT_FILENAME}')
filepath

In [8]:
us_car_accidents = (spark.read.format('csv')
                    .option('header', 'true')
                    .option('inferSchema', 'true')
                    .load(filepath))

In [9]:
# schema
us_car_accidents.printSchema()

In [10]:
from pyspark.sql import functions as F
from pyspark.sql.types import StringType, IntegerType, DateType, FloatType

In [11]:
# select useful cols
us_car_accidents_final = us_car_accidents.select(
  F.col('State').cast(StringType()),
  F.col('Severity').cast(IntegerType()),
  F.month('Start_Time').alias('month'),
  F.year('Start_Time').alias('year')
)

In [12]:
display(us_car_accidents_final.head(5))

State,Severity,month,year
OH,3,2,2016
OH,2,2,2016
OH,2,2,2016
OH,3,2,2016
OH,2,2,2016


In [13]:
us_car_accidents_final.count()

In [14]:
# check for null values
display(us_car_accidents_final.describe())

summary,State,Severity,month,year
count,3513617,3513617.0,3513617.0,3513617.0
mean,,2.339928626256077,6.525489829995699,2018.1404342021344
stddev,,0.5521934519055779,3.487269603061736,1.2398291446768033
min,AL,1.0,1.0,2016.0
max,WY,4.0,12.0,2020.0


In [15]:
### GroupBy 
us_car_accident_grouped = (us_car_accidents_final
                           .groupby(['State'])
                           .count()
                           .orderBy(F.desc('count')))

### Grouped count
display(us_car_accident_grouped)

State,count
CA,816825
TX,329284
FL,258002
SC,173277
NC,165958
NY,160817
PA,106787
IL,99692
VA,96075
MI,95983


In [16]:
BUCKET_NAME="lead-us-car"
DIRECTORY="final"

(us_car_accident_grouped
 .coalesce(1) # to force saving in one file
 .write
 .format("com.databricks.spark.csv")
 .option("header", "true")
 .save("s3a://{}:{}@{}/{}".format(ACCESS_KEY, SECRET_KEY, BUCKET_NAME, DIRECTORY)))