## Predefined Spark varibles sc, spark and sqlContext

In [None]:
sc

In [None]:
sqlContext

## Parallelize a python data structure

In [None]:
myRDD = sc.parallelize(range(1000), 10)

In [None]:
myRDD.getNumPartitions()

In [None]:
myRDD.take(5)

## Rename RDD and set persistency level

In [None]:
from pyspark.storagelevel import *
myRDD.setName("myRDD").persist(StorageLevel.MEMORY_AND_DISK_SER) 

In [None]:
tmpMyRDD = myRDD.map(lambda x: x*2).filter(lambda x: x < 10).collect()

Check UI -> Storage to see myRDD

## How partitions work?

In [None]:
allNums = myRDD.glom().collect()

In [None]:
len(allNums)

In [None]:
allNums[0:2]

## Loading twitter dataset

In [None]:
from pyspark.sql.types import *
import time

In [None]:
tweetsCSV = sc.textFile("wasb://mie451datasets@mie451files.blob.core.windows.net/tweets2009-06-0115.csv", 20)

In [None]:
tweetsCSV.take(5)

In [None]:
tweetsCSV.getNumPartitions()

In [None]:
def isEnglish(s):
    try:
        s.encode('ascii')
    except UnicodeEncodeError:
        return False
    else:
        return True

# RDD Transformations: parse the data in tweetsCSV\n",
tweets = tweetsCSV.filter(lambda s: isEnglish(s)).map(lambda s: s.split("\t")).filter(lambda s: s[0] != "date" and len(s) == 3).map(lambda s:(str(s[0]), str(s[1]), str(s[2])))

In [None]:
tweets.take(5)

In [None]:
tweets.getNumPartitions()

## Unoptimized Code

In [None]:
time1 = time.time()
tweetCounts = tweets.groupBy(lambda x: x[1]).map(lambda x: (x[0], len(x[1]))).collect()
time2 = time.time()
print((time2-time1)*1000.0)

In [None]:
time1 = time.time()
tweetCounts = tweets.groupBy(lambda x: x[1]).map(lambda x: (x[0], x[1].data[0])).collect()
time2 = time.time()
print((time2-time1)*1000.0)

In [None]:
time1 = time.time()
tweetCounts = tweets.groupBy(lambda x: x[1]).map(lambda x: (x[0], x[1].data[-1])).collect()
time2 = time.time()
print((time2-time1)*1000.0)

## Better partitioning

In [None]:
partitioned = tweets.groupBy(lambda x: x[1]).partitionBy(20)

In [None]:
time1 = time.time()
partitioned.map(lambda x: (x[0], len(x[1]))).collect()
time2 = time.time()
print((time2-time1)*1000.0)

In [None]:
time1 = time.time()
partitioned.map(lambda x: (x[0], x[1].data[0])).collect()
time2 = time.time()
print((time2-time1)*1000.0)

In [None]:
time1 = time.time()
partitioned.map(lambda x: (x[0], x[1].data[-1])).collect()
time2 = time.time()
print((time2-time1)*1000.0)

## Using Cache

In [None]:
groupedRDD = tweets.groupBy(lambda x: x[1])
groupedRDD.cache()

In [None]:
time1 = time.time()
tweetCounts = groupedRDD.map(lambda x: (x[0], len(x[1]))).collect()
time2 = time.time()
print((time2-time1)*1000.0)

In [None]:
time1 = time.time()
tweetCounts = groupedRDD.map(lambda x: (x[0], x[1].data[0])).collect()
time2 = time.time()
print((time2-time1)*1000.0)

In [None]:
time1 = time.time()
tweetCounts = groupedRDD.map(lambda x: (x[0], x[1].data[-1])).collect()
time2 = time.time()
print((time2-time1)*1000.0)

## DataFrames

In [None]:
# Create schema for dataframe
tweetsSchema = StructType([StructField("date", StringType(), False), 
                           StructField("user", StringType(), False), 
                           StructField("tweet", StringType(), False)])

# Create data frame
tweetsDF = sqlContext.createDataFrame(tweets, tweetsSchema)

In [None]:
tweetsDF

In [None]:
tweetsDF.printSchema()

In [None]:
tweetsDF.show()

## Working with SQL

In [None]:
tweetsDF.registerTempTable("tweets")

In [None]:
sqlContext.sql("SELECT * FROM tweets LIMIT 5").show()

In [None]:
summarizedDF = sqlContext.sql("SELECT user, COUNT(1) AS tweetCount from tweets GROUP BY user")

In [None]:
summarizedDF.show()

## using SparkSQL functions

In [None]:
import pyspark.sql.functions as sf
tweetsDF.groupBy("user").agg(sf.count("*").alias("tweetCount")).select("user", "tweetCount").show()

## Joining

In [None]:
#.join default it INNER JOIN
combined = tweetsDF.join(summarizedDF, tweetsDF.user == summarizedDF.user).select("date", tweetsDF.user, "tweet", "tweetCount")
combined.show()

### and as SQL query:

In [None]:
summarizedDF.registerTempTable("summarized")

In [None]:
sqlContext.sql("""
SELECT tweets.date, tweets.user, tweets.tweet, summarized.tweetCount
FROM tweets INNER JOIN summarized on tweets.user = summarized.user
""").show()

## Caching

In [None]:
# Result format (useDisk, useMemory, useOffHeap, deserialized, replication)
summarizedDF.storageLevel

In [None]:
str(summarizedDF.storageLevel)

In [None]:
summarizedDF.cache() # equivalent to .persist(MEMORY_AND_DISK)

In [None]:
summarizedDF.storageLevel

In [None]:
str(summarizedDF.storageLevel)

## Extracting Data

In [None]:
rows = tweetsDF.take(5) # or collect() for the whole dataframe
rows

In [None]:
rows[0]

In [None]:
rows[0].user

In [None]:
rows[0].asDict()

### Alternative: Export to Pandas

In [None]:
pdDF = tweetsDF.limit(5).toPandas()
pdDF

In [None]:
type(pdDF)

## Saving processed dataframe

In [None]:
# format can be also json, ...
tweetsDF.write.format('parquet').save("/path/to/output")

In [None]:
loadedDF = sqlContext.read.parquet("/path/to/output")