## How to create parallelized collections

In [1]:
#Create a list of 10000 integers
data = range(1, 10001)

#It will just print out the function but not data itself
print(data)

#Let's use SparkContext and parallelize() function to create a RDD Dataset and spread it across the cluster partitions
ds_rdd = sc.parallelize(data, 5)
ds_rdd_1 = sc.parallelize([('a',1),('a',2),('b',4),('b',1),('c',3),('c',3)])
ds_rdd_2 = sc.parallelize([('a',[1,2,3,4]),('b',[5,6,7])])
ds_rdd_3 = sc.parallelize(range(1, 11))
ds_rdd_4 = sc.parallelize([('b',1),('c',3),('c',3)])

#Collect() action will trigger execution and then we can print 10 first element of our list
print(ds_rdd.collect()[:10])
print(ds_rdd_1.collect())
print(ds_rdd_2.collect())
print(ds_rdd_3.collect())

## Using Help command...

In [2]:
help(sc.parallelize)

## Basic informations

In [3]:
#Returns number of partitions
print("Number of RDD partitions: ",ds_rdd.getNumPartitions())
#Returns number of instances
print("Number of instances: ",ds_rdd.count())
#Returns sum of instances
print("Sum of Rdd instances: ",ds_rdd.sum())
#Returns number of instances by key
print("Number of instances by key: ",ds_rdd_1.countByKey())
#Returns number of instances by value
print("Number of instances by value: ",ds_rdd_1.countByValue())
#Returns dictionary with key,value pairs. Some instances are removed if keys are duplicated
print("Dictionary with key,value pairs: ",ds_rdd_1.collectAsMap())

## Getting summary stats

In [4]:
#Returns min. value of RDD elements
print("Min. value: ",ds_rdd.min())
#Returns max. value of RDD elements
print("Max. value: ",ds_rdd.max())
#Returns mean. value of RDD elements
print("Mean value: ",ds_rdd.mean())
#Returns standard deviation value of RDD elements
print("Stdev. value: ",ds_rdd.stdev())
#Returns varaiance value of RDD elements
print("Variance value: ",ds_rdd.variance())
#Returns histogram for specified number of bins of RDD elements
print("Histogram: ",ds_rdd.histogram(10))
#Returns overall stats of RDD elements
print("Summary stats.: ",ds_rdd.stats())

# Applying lambda functions/Iterating

In [5]:
#Apply lambda function on each rdd element
print(ds_rdd.map(lambda x: x*2).collect()[:10])
#Apply lambda function on each rdd element and flatten result
print(ds_rdd_1.flatMap(lambda x:x).collect())
#Apply flatMap function on key,value pair without changing key value
print(ds_rdd_2.flatMapValues(lambda x:x).collect())
#create a function and apply it to all rdd elements
def print_el(x): print(x)
ds_rdd.foreach(print_el)


## Selecting/Filtering data

In [6]:
#return a list of all elements of rdd
print(ds_rdd.collect()[:10])
#return a given number of rdd elements
print(ds_rdd.take(10))
#return first element of rdd
print(ds_rdd.first())
#return a given number of top (last) elements
print(ds_rdd.top(5))
#return sampled subset of rdd
print(ds_rdd.sample(withReplacement=False,fraction=0.001, seed=1).collect())
#filter rdd basing on some condition
print(ds_rdd_1.filter(lambda x: 'b' in x).collect())
#return distinct rdd values
print(ds_rdd_1.distinct().collect())
#return list of rdd keys
print(ds_rdd_1.keys().collect())
#return list of rdd values
print(ds_rdd_1.values().collect())

## Reducing/Grouping

In [7]:
#Merge/reduce rdd values by key
print(ds_rdd_1.reduceByKey(lambda x,y:x+y).collect())
#Merge/reduce rdd values
print(ds_rdd_1.reduce(lambda x,y:x+y))
#return rdd of grouped values
print(ds_rdd_3.groupBy(lambda x: x%2).mapValues(list).collect())
#group rdd by key
print(ds_rdd_1.groupByKey().mapValues(list).collect())

## Mathematical operations/Sorting

In [8]:
#returns all rdd values avaliable in one rdd and not available in the other one
print(ds_rdd_1.subtract(ds_rdd_4).collect())
#returns all rdd values for which key is avaliable in one rdd and is not available in the other one
print(ds_rdd_1.subtractByKey(ds_rdd_4).collect())
#returm a cartesian product of two rdds
print(ds_rdd_1.cartesian(ds_rdd_4).collect())
#sort rdd by given function
print(ds_rdd_1.sortBy(lambda x: x[1]).collect())
#sort rdd by Key
print(ds_rdd_1.sortByKey().collect())

## Loading file to dataframe/performing queries

In [9]:
#Load csv file to dataframe
df = spark.read.format("csv") \
  .option("inferSchema", "true") \
  .option("header", "true") \
  .option("sep", ",") \
  .load("/FileStore/tables/worldcities.csv")

display(df)

city,city_ascii,lat,lng,country,iso2,iso3,admin_name,capital,population,id
Malishevë,Malisheve,42.4822,20.7458,Kosovo,XK,XKS,Malishevë,admin,,1901597212
Prizren,Prizren,42.2139,20.7397,Kosovo,XK,XKS,Prizren,admin,,1901360309
Zubin Potok,Zubin Potok,42.9144,20.6897,Kosovo,XK,XKS,Zubin Potok,admin,,1901608808
Kamenicë,Kamenice,42.5781,21.5803,Kosovo,XK,XKS,Kamenicë,admin,,1901851592
Viti,Viti,42.3214,21.3583,Kosovo,XK,XKS,Viti,admin,,1901328795
Shtërpcë,Shterpce,42.2394,21.0272,Kosovo,XK,XKS,Shtërpcë,admin,,1901828239
Shtime,Shtime,42.4331,21.0397,Kosovo,XK,XKS,Shtime,admin,,1901598505
Vushtrri,Vushtrri,42.8231,20.9675,Kosovo,XK,XKS,Vushtrri,admin,,1901107642
Dragash,Dragash,42.0265,20.6533,Kosovo,XK,XKS,Dragash,admin,,1901112530
Podujevë,Podujeve,42.9111,21.1899,Kosovo,XK,XKS,Podujevë,admin,,1901550082


In [10]:
#pring dataframe schema
df.printSchema()

In [11]:
# Create a temporary view
df.createOrReplaceTempView("worldcities_csv")

In [12]:
%sql
show tables

database,tableName,isTemporary
default,cleaned_taxes,False
,worldcities_csv,True


In [13]:
%sql
/* Query created view*/
select * from `worldcities_csv` where population>10000000
sort by population desc

city,city_ascii,lat,lng,country,iso2,iso3,admin_name,capital,population,id
Tokyo,Tokyo,35.685,139.7514,Japan,JP,JPN,Tōkyō,primary,35676000.0,1392685764
New York,New York,40.6943,-73.9249,United States,US,USA,New York,,19164071.0,1840034016
Mexico City,Mexico City,19.4424,-99.131,Mexico,MX,MEX,Ciudad de México,primary,19028000.0,1484247881
Mumbai,Mumbai,19.017,72.857,India,IN,IND,Mahārāshtra,admin,18978000.0,1356226629
São Paulo,Sao Paulo,-23.5587,-46.625,Brazil,BR,BRA,São Paulo,admin,18845000.0,1076532519
Delhi,Delhi,28.67,77.23,India,IN,IND,Delhi,admin,15926000.0,1356872604
Shanghai,Shanghai,31.2165,121.4365,China,CN,CHN,Shanghai,admin,14987000.0,1156073548
Kolkata,Kolkata,22.495,88.3247,India,IN,IND,West Bengal,admin,14787000.0,1356060520
Dhaka,Dhaka,23.7231,90.4086,Bangladesh,BD,BGD,Dhaka,primary,12797394.0,1050529279
Buenos Aires,Buenos Aires,-34.6025,-58.3975,Argentina,AR,ARG,"Buenos Aires, Ciudad Autónoma de",primary,12795000.0,1032717330


## Spark in use - Check the most common 10 words in "Pan Tadeusz" poem  (without words shorter than 4 letters)

In [14]:
import re
lines = sc.textFile("/FileStore/tables/pan_tadeusz_ansi.txt")
words = lines.flatMap(lambda x: re.sub(r'[^a-z ]+', '', x.lower().replace('\n',' ')).split(' '))
pairs = words.map(lambda x: (x,1))
count = pairs.reduceByKey(lambda x,y: x+y)
print(count.filter(lambda x: len(x[0])>3).sortBy(lambda x :x[1],False).collect()[:10])
