# Module 2 Data Analysis with Apache Spark

## Create RDD

In [None]:
a = [i*i for i in range(100)]

In [None]:
b = sc.parallelize(a)
#b = sc.parallelize(a,10)

## Import RDD Data

In [None]:
rdd = sc.textFile('/FileStore/tables/8ik9k75m1506409315082/sales.csv')

## Challenge: Import Data

In [None]:
rdd = sc.textFile('/FileStore/tables/75b42w5k1506411921260/clients.csv')

## Databricks Datasets 

In [None]:
%fs ls /databricks-datasets/

## Import Data to DataFrame


In [None]:
# Method 1

df  = spark.read.csv("/databricks-datasets/online_retail/data-001/data.csv") 
display(df)

In [None]:
# Metehod 2
df = spark.read.load("/databricks-datasets/online_retail/data-001/data.csv",
                    format='com.databricks.spark.csv', 
                    header='true',
                    inferSchema='true')

display(df)

In [None]:
# take a look at our schema
df.printSchema()

In [None]:
# Method 3

from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
data = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load("/databricks-datasets/online_retail/data-001/data.csv")

display(data)

## Actions

In [None]:
data.count()

In [None]:
data.first()

In [None]:
data.take(20)

## Challenge: Action

In [None]:
data = ['apple', 'orange','banana','pineapple']
sc.parallelize(data).count()

## Transformation

In [None]:
df = spark.read.load("/databricks-datasets/online_retail/data-001/data.csv",
                    format='com.databricks.spark.csv', 
                    header='true',
                    inferSchema='true')
df2 = df.select("Country").distinct().orderBy("Country")
display(df2)

In [None]:
# Select
df.select("Country").show()

In [None]:
# Distinct and Order By
display(
   df 
    .select("Country") # chooses just the 1 column
    .distinct() # removes duplicates
    .orderBy("Country") # sorts results in ascending
)

In [None]:
# Group By
display(
  df
    .select(df["InvoiceNo"],df["UnitPrice"]*df["Quantity"])
    .groupBy("InvoiceNo")
    .sum()
  )

In [None]:
# Filter 
df.filter(df["InvoiceNo"]==536596).show()

In [None]:
# Alias
display(
  df
    .select(df["Country"], df["Description"],(df["UnitPrice"]*df["Quantity"]).alias("Total"))
    .groupBy("Country", "Description")
    .sum()
    .filter(df["Country"]=="United Kingdom")
    .sort("sum(Total)", ascending=False)
    .limit(10)
  )

In [None]:
# Map
a = [1,2,1,4,2,3,4]
b = sc.parallelize(a).map(lambda x: (x, x*x))
print(b.collect())

In [None]:
# ReduceByKey

c = b.reduceByKey(lambda x,y: x*y)
print(c.collect())

## Challenge: Transformation

In [None]:
numBikes = data.filter(lambda s: 'bike' in s.lower()).count()
print("Lines with 'bike': %i" % (numBikes))