# Installiamo pyspark

In [1]:
! pip install pyspark

Collecting pyspark
  Downloading pyspark-3.1.2.tar.gz (212.4 MB)
[K     |████████████████████████████████| 212.4 MB 70 kB/s 
[?25hCollecting py4j==0.10.9
  Downloading py4j-0.10.9-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 53.2 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.1.2-py2.py3-none-any.whl size=212880768 sha256=7b5c31f97b94266da0093c7accbda9d6f7dd1ca18e708649492cf77ee6edc949
  Stored in directory: /root/.cache/pip/wheels/a5/0a/c1/9561f6fecb759579a7d863dcd846daaa95f598744e71b02c77
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.1.2


# Importiamo pyspark

In [2]:
import pyspark
import urllib.request

In [3]:
# https://realpython.com/pyspark-intro/

# Inizializziamo il manager di Spark

In [4]:
# local indica che stiamo usando la macchina locale, il * indica che vogliamo usare tutti i processori disponibili
sc = pyspark.SparkContext('local[*]')

# Primo esempio di utilizzo pyspark

Creiamo un file di testo PySpark e contiamo il numero di frasi al suo interno!

In [5]:
txt = sc.textFile('file:////usr/share/doc/python/copyright')
print(txt.collect()) # il metodo collect server per portare nel nodo master il file che si trova nei diversi datanode
print(txt.count()) # il metodo count chiama intrinsicamente il metodo collect e conta gli elementi all'interno del file fornito

316


In [6]:
collected = txt.collect()
print(type(collected)) # è una lista
print(len(collected))

<class 'list'>
316


In [7]:
print(type(txt)) # questo è un RDD, ossia un Resilient Distributed Dataset

<class 'pyspark.rdd.RDD'>


## Filtriamo il testo prendendo le righe che contengono la parola "python"


In [8]:
python_lines = txt.filter(lambda line: 'python' in line.lower())
python_lines_vere = python_lines.collect()
print(len(python_lines_vere))

52


# Word count example

In [9]:
urllib.request.urlretrieve(f"http://www.gutenberg.org/files/100/100-0.txt", "shakespeare.txt")

('shakespeare.txt', <http.client.HTTPMessage at 0x7f9c0a6337d0>)

In [10]:
file_url = "https://raw.githubusercontent.com/words-sdsc/coursera/master/big-data-1/shakespeare.txt"
urllib.request.urlretrieve(file_url, "shakespeare.txt")

('shakespeare.txt', <http.client.HTTPMessage at 0x7f9c0a5d73d0>)

In [11]:
lines = sc.textFile("shakespeare.txt")
print(lines.count())

124456


In [12]:
words = lines.flatMap(lambda line: line.split(' ')) \
                .map(lambda word: word.lower()) \
                .filter(lambda word: len(word) > 0)

In [13]:
from string import punctuation

def strip_punctuation(string):
    return ''.join(char for char in string if char not in punctuation)

stop_words = ["i", "you", "he", "she", "it", "they", "we", "us", "them", "him", "her", "my", "your", "our", "his", "its", "their", "mine", "ours", "yours", "theirs", "hers", "me", "ii","iii","iv","vi","vii","viii","ix","x","xi","xii","xiii","xiv","xv","xvi","xvii","xvii","xviii","xix","xx", "one", "two", "thee", "four", "five", "six", "seven", "eight", "nine", "ten", "will", "thy", "a","able","about","above","abst","accordance","according","accordingly","across","act","actually","added","adj","affected","affecting","affects","after","afterwards","again","against","ah","all","almost","alone","along","already","also","although","always","am","among","amongst","an","and","announce","another","any","anybody","anyhow","anymore","anyone","anything","anyway","anyways","anywhere","apparently","approximately","are","aren","arent","arise","around","as","aside","ask","asking","at","auth","available","away","awfully","b","back","be","became","because","become","becomes","becoming","been","before","beforehand","begin","beginning","beginnings","begins","behind","being","believe","below","beside","besides","between","beyond","biol","both","brief","briefly","but","by","c","ca","came","can","cannot","can't","cause","causes","certain","certainly","co","com","come","comes","contain","containing","contains","could","couldnt","d","date","did","didn't","different","do","does","doesn't","doing","done","don't","down","downwards","due","during","e","each","ed","edu","effect","eg","eight","eighty","either","else","elsewhere","end","ending","enough","especially","et","et-al","etc","even","ever","every","everybody","everyone","everything","everywhere","ex","except","f","far","few","ff","fifth","first","five","fix","followed","following","follows","for","former","formerly","forth","found","four","from","further","furthermore","g","gave","get","gets","getting","give","given","gives","giving","go","goes","gone","got","gotten","h","had","happens","hardly","has","hasn't","have","haven't","having","he","hed","hence","her","here","hereafter","hereby","herein","heres","hereupon","hers","herself","hes","hi","hid","him","himself","his","hither","home","how","howbeit","however","hundred","i","id","ie","if","i'll","im","immediate","immediately","importance","important","in","inc","indeed","index","information","instead","into","invention","inward","is","isn't","it","itd","it'll","its","itself","i've","j","just","k","keep	keeps","kept","kg","km","know","known","knows","l","largely","last","lately","later","latter","latterly","least","less","lest","let","lets","like","liked","likely","line","little","'ll","look","looking","looks","ltd","m","made","mainly","make","makes","many","may","maybe","me","mean","means","meantime","meanwhile","merely","mg","might","million","miss","ml","more","moreover","most","mostly","mr","mrs","much","mug","must","my","myself","n","na","name","namely","nay","nd","near","nearly","necessarily","necessary","need","needs","neither","never","nevertheless","new","next","nine","ninety","no","nobody","non","none","nonetheless","noone","nor","normally","nos","not","noted","nothing","now","nowhere","o","obtain","obtained","obviously","of","off","often","oh","ok","okay","old","omitted","on","once","one","ones","only","onto","or","ord","other","others","otherwise","ought","our","ours","ourselves","out","outside","over","overall","owing","own","p","page","pages","part","particular","particularly","past","per","perhaps","placed","please","plus","poorly","possible","possibly","potentially","pp","predominantly","present","previously","primarily","probably","promptly","proud","provides","put","q","que","quickly","quite","qv","r","ran","rather","rd","re","readily","really","recent","recently","ref","refs","regarding","regardless","regards","related","relatively","research","respectively","resulted","resulting","results","right","run","s","said","same","saw","say","saying","says","sec","section","see","seeing","seem","seemed","seeming","seems","seen","self","selves","sent","seven","several","shall","she","shed","she'll","shes","should","shouldn't","show","showed","shown","showns","shows","significant","significantly","similar","similarly","since","six","slightly","so","some","somebody","somehow","someone","somethan","something","sometime","sometimes","somewhat","somewhere","soon","sorry","specifically","specified","specify","specifying","still","stop","strongly","sub","substantially","successfully","such","sufficiently","suggest","sup","sure	t","take","taken","taking","tell","tends","th","than","thank","thanks","thanx","that","that'll","thats","that've","the","their","theirs","them","themselves","then","thence","there","thereafter","thereby","thered","therefore","therein","there'll","thereof","therere","theres","thereto","thereupon","there've","these","they","theyd","they'll","theyre","they've","think","this","those","thou","though","thoughh","thousand","throug","through","throughout","thru","thus","til","tip","to","together","too","took","toward","towards","tried","tries","truly","try","trying","ts","twice","two","u","un","under","unfortunately","unless","unlike","unlikely","until","unto","up","upon","ups","us","use","used","useful","usefully","usefulness","uses","using","usually","v","value","various","'ve","very","via","viz","vol","vols","vs","w","want","wants","was","wasnt","way","we","wed","welcome","we'll","went","were","werent","we've","what","whatever","what'll","whats","when","whence","whenever","where","whereafter","whereas","whereby","wherein","wheres","whereupon","wherever","whether","which","while","whim","whither","who","whod","whoever","whole","who'll","whom","whomever","whos","whose","why","widely","willing","wish","with","within","without","wont","words","world","would","wouldnt","www","x","y","yes","yet","you","youd","you'll","your","youre","yours","yourself","yourselves","you've","z","zero"]

words = words.map(lambda word: strip_punctuation(word).strip()) \
             .filter(lambda word: not (word in stop_words or word.isdigit())) \
             .cache()

print('Words: ' + ', '.join(words.take(10)) + '...')

Words: 100th, etext, file, presented, project, gutenberg, presented, cooperation, library, library...


In [14]:
wordCounts = words.map(lambda word: (word, 1)).reduceByKey(lambda a,b:a + b)
print(wordCounts.collect())



# Census

## Il nostro primo dataframe

`pandas` è una libreria di analisi e manipolazione dei dati open source veloce, potente, flessibile e facile da usare, costruita sul linguaggio di programmazione Python.

Permette di manipolare diversi tipi di dato tabellare (es. tabelle excel, file CSV).

Supporta costrutti sia di tipo SQL e NOSQL che scopriremo nelle prossime lezioni.

In [15]:
import pandas as pd # importiamo pandas e rinominiamolo come pd

In [16]:
file_url = "https://raw.githubusercontent.com/words-sdsc/coursera/master/big-data-2/csv/census.csv"
urllib.request.urlretrieve(file_url, "census.csv")

('census.csv', <http.client.HTTPMessage at 0x7f9c0a115110>)

In [17]:
census_dataframe = pd.read_csv("census.csv",encoding='latin-1') # leggiamo il file csv e convertiamolo in un dataframe

In [18]:
census_dataframe # guardiamo il dataframe

Unnamed: 0,SUMLEV,REGION,DIVISION,STATE,COUNTY,STNAME,CTYNAME,CENSUS2010POP,ESTIMATESBASE2010,POPESTIMATE2010,POPESTIMATE2011,POPESTIMATE2012,POPESTIMATE2013,POPESTIMATE2014,POPESTIMATE2015,NPOPCHG_2010,NPOPCHG_2011,NPOPCHG_2012,NPOPCHG_2013,NPOPCHG_2014,NPOPCHG_2015,BIRTHS2010,BIRTHS2011,BIRTHS2012,BIRTHS2013,BIRTHS2014,BIRTHS2015,DEATHS2010,DEATHS2011,DEATHS2012,DEATHS2013,DEATHS2014,DEATHS2015,NATURALINC2010,NATURALINC2011,NATURALINC2012,NATURALINC2013,NATURALINC2014,NATURALINC2015,INTERNATIONALMIG2010,...,RESIDUAL2013,RESIDUAL2014,RESIDUAL2015,GQESTIMATESBASE2010,GQESTIMATES2010,GQESTIMATES2011,GQESTIMATES2012,GQESTIMATES2013,GQESTIMATES2014,GQESTIMATES2015,RBIRTH2011,RBIRTH2012,RBIRTH2013,RBIRTH2014,RBIRTH2015,RDEATH2011,RDEATH2012,RDEATH2013,RDEATH2014,RDEATH2015,RNATURALINC2011,RNATURALINC2012,RNATURALINC2013,RNATURALINC2014,RNATURALINC2015,RINTERNATIONALMIG2011,RINTERNATIONALMIG2012,RINTERNATIONALMIG2013,RINTERNATIONALMIG2014,RINTERNATIONALMIG2015,RDOMESTICMIG2011,RDOMESTICMIG2012,RDOMESTICMIG2013,RDOMESTICMIG2014,RDOMESTICMIG2015,RNETMIG2011,RNETMIG2012,RNETMIG2013,RNETMIG2014,RNETMIG2015
0,40,3,6,1,0,Alabama,Alabama,4779736,4780127,4785161,4801108,4816089,4830533,4846411,4858979,5034,15947,14981,14444,15878,12568,14226,59689,59062,57938,58334,58305,11089,48811,48357,50843,50228,50330,3137,10878,10705,7095,8106,7975,1357,...,677,-573,1135,116185,116212,115560,115666,116963,119088,119599,12.453020,12.282581,12.012080,12.056286,12.014973,10.183524,10.056360,10.541099,10.380963,10.371556,2.269496,2.226220,1.470981,1.675322,1.643417,1.027720,1.019840,1.002216,1.142716,1.179963,0.002295,-0.193196,0.381066,0.582002,-0.467369,1.030015,0.826644,1.383282,1.724718,0.712594
1,50,3,6,1,1,Alabama,Autauga County,54571,54571,54660,55253,55175,55038,55290,55347,89,593,-78,-137,252,57,151,636,615,574,623,600,152,507,558,583,504,467,-1,129,57,-9,119,133,33,...,22,-10,45,455,455,455,455,455,455,455,11.572789,11.138479,10.416194,11.293597,10.846281,9.225478,10.106133,10.579514,9.136393,8.442022,2.347311,1.032347,-0.163320,2.157204,2.404259,0.363924,0.289782,0.290347,0.326300,0.343466,7.242091,-2.915927,-3.012349,2.265971,-2.530799,7.606016,-2.626146,-2.722002,2.592270,-2.187333
2,50,3,6,1,3,Alabama,Baldwin County,182265,182265,183193,186659,190396,195126,199713,203709,928,3466,3737,4730,4587,3996,517,2187,2092,2160,2186,2240,532,1825,1879,1902,2044,1992,-15,362,213,258,142,248,69,...,91,434,58,2307,2307,2307,2249,2304,2308,2309,11.826352,11.096524,11.205586,11.072868,11.104997,9.868812,9.966716,9.867141,10.353587,9.875515,1.957540,1.129809,1.338445,0.719281,1.229482,1.011215,0.912334,0.881921,1.073855,1.095627,14.832960,17.647293,21.845705,19.243287,17.197872,15.844176,18.559627,22.727626,20.317142,18.293499
3,50,3,6,1,5,Alabama,Barbour County,27457,27457,27341,27226,27159,26973,26815,26489,-116,-115,-67,-186,-158,-326,70,335,300,283,260,269,128,319,291,294,310,309,-58,16,9,-11,-50,-40,2,...,19,-1,-5,3193,3193,3382,3388,3389,3353,3352,12.278483,11.032454,10.455923,9.667584,10.093051,11.692048,10.701480,10.862337,11.526735,11.593877,0.586435,0.330974,-0.406414,-1.859151,-1.500825,-0.146609,-0.257424,-0.110840,-0.074366,0.000000,-4.728132,-2.500690,-7.056824,-3.904217,-10.543299,-4.874741,-2.758113,-7.167664,-3.978583,-10.543299
4,50,3,6,1,7,Alabama,Bibb County,22915,22919,22861,22733,22642,22512,22549,22583,-58,-128,-91,-130,37,34,44,266,245,259,247,253,34,278,237,281,211,223,10,-12,8,-22,36,30,2,...,14,-16,-21,2224,2224,2224,2224,2224,2233,2236,11.668202,10.798898,11.471852,10.962917,11.211557,12.194587,10.446281,12.446295,9.365083,9.882124,-0.526385,0.352617,-0.974443,1.597834,1.329434,0.438654,0.705234,0.797272,0.932070,0.930604,-5.527043,-5.068871,-6.201001,-0.177537,0.177258,-5.088389,-4.363636,-5.403729,0.754533,1.107861
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
3188,50,4,8,56,37,Wyoming,Sweetwater County,43806,43806,43593,44041,45104,45162,44925,44626,-213,448,1063,58,-237,-299,167,640,595,657,629,620,76,251,273,296,246,262,91,389,322,361,383,358,5,...,-64,14,-27,679,679,694,697,731,671,672,14.606203,13.349038,14.556976,13.964279,13.846858,5.728370,6.124853,6.558394,5.461387,5.851414,8.877833,7.224185,7.998582,8.502892,7.995444,0.182578,0.000000,0.044313,0.177606,0.178669,1.072643,16.243199,-5.339774,-14.252889,-14.248864,1.255221,16.243199,-5.295460,-14.075283,-14.070195
3189,50,4,8,56,39,Wyoming,Teton County,21294,21294,21297,21482,21697,22347,22905,23125,3,185,215,650,558,220,76,259,230,261,249,269,10,87,61,97,68,76,66,172,169,164,181,193,5,...,20,8,-8,271,271,271,270,268,268,267,12.108745,10.653327,11.851785,11.005038,11.688030,4.067416,2.825448,4.404686,3.005392,3.302194,8.041329,7.827879,7.447098,7.999646,8.385835,2.244092,1.435883,1.634729,2.165650,2.085596,-1.589565,0.972695,19.525929,14.143021,-0.564849,0.654527,2.408578,21.160658,16.308671,1.520747
3190,50,4,8,56,41,Wyoming,Uinta County,21118,21118,21102,20912,20989,21022,20903,20822,-16,-190,77,33,-119,-81,73,324,311,316,316,316,49,139,115,136,130,137,24,185,196,180,186,179,2,...,11,4,3,270,270,245,236,254,254,254,15.423430,14.844514,15.043679,15.074538,15.146794,6.616842,5.489129,6.474495,6.201550,6.566806,8.806588,9.355385,8.569184,8.872987,8.579988,-0.380825,-0.620510,-0.618886,-0.524747,-0.479329,-17.755986,-4.916350,-6.902954,-14.215862,-12.127022,-18.136812,-5.536861,-7.521840,-14.740608,-12.606351
3191,50,4,8,56,43,Wyoming,Washakie County,8533,8533,8545,8469,8443,8443,8316,8328,12,-76,-26,0,-127,12,26,108,90,95,96,90,34,79,105,77,70,79,-8,29,-15,18,26,11,1,...,1,-2,-11,140,140,140,140,140,140,140,12.695427,10.643330,11.251925,11.456531,10.814708,9.286470,12.417219,9.119981,8.353720,9.492910,3.408957,-1.773888,2.131944,3.102810,1.321798,-0.352651,-0.354778,-0.236883,-0.238678,-0.240327,-11.637475,-0.827815,-2.013502,-17.781491,1.682288,-11.990126,-1.182592,-2.250385,-18.020168,1.441961


## Convertiamo il dataframe pandas in un DataFrame di Spark

In [31]:
from pyspark.sql import SparkSession
#Create PySpark SparkSession
spark = SparkSession.builder \
    .master("local[*]") \
    .getOrCreate()
#Create PySpark DataFrame from Pandas
sparkDF=spark.createDataFrame(census_dataframe) 
print(type(sparkDF))
sparkDF.show()


<class 'pyspark.sql.dataframe.DataFrame'>
+------+------+--------+-----+------+-------+---------------+-------------+-----------------+---------------+---------------+---------------+---------------+---------------+---------------+------------+------------+------------+------------+------------+------------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+--------------+--------------+--------------+--------------+--------------+--------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+---------------+---------------+---------------+---------------+---------------+---------------+----------+----------+----------+----------+----------+----------+------------+------------+------------+------------+------------+------------+-------------------+---------------+---------------+---------------+---------------+---------------+---

### Filtriamo i record dello stato delle hawaii

In [35]:
sparkDF.filter(sparkDF["STNAME"] == "Hawaii").show(truncate=False)

+------+------+--------+-----+------+------+---------------+-------------+-----------------+---------------+---------------+---------------+---------------+---------------+---------------+------------+------------+------------+------------+------------+------------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+--------------+--------------+--------------+--------------+--------------+--------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+---------------+---------------+---------------+---------------+---------------+---------------+----------+----------+----------+----------+----------+----------+------------+------------+------------+------------+------------+------------+-------------------+---------------+---------------+---------------+---------------+---------------+---------------+------------------+-----------

### Calcoliamo le morti nel 2015 per ogni stato

In [20]:
# Mostriamo le morti nel 2015 per ogni stato
sparkDF.groupBy("STNAME").sum("RDEATH2015").show(truncate=False)

+--------------------+------------------+
|STNAME              |sum(RDEATH2015)   |
+--------------------+------------------+
|Utah                |181.95991247670003|
|Hawaii              |41.8654982376     |
|Minnesota           |815.3963623359003 |
|Ohio                |892.5382745586002 |
|Arkansas            |871.6195492520001 |
|Oregon              |358.7234076833    |
|Texas               |2250.3133802321995|
|North Dakota        |511.41775998720004|
|Pennsylvania        |727.7267292704001 |
|Connecticut         |74.2072730157     |
|Nebraska            |870.1029771498002 |
|Vermont             |132.2155256929    |
|Nevada              |155.22245142500003|
|Washington          |353.5000305705    |
|Illinois            |1069.0199218407   |
|Oklahoma            |869.8685865387998 |
|District of Columbia|15.6689168088     |
|Delaware            |37.4628661323     |
|Alaska              |197.01206453409992|
|New Mexico          |306.2188892200999 |
+--------------------+------------

### Calcoliamo le morti medie per città nel 2015 per ogni stato

In [21]:
sparkDF.groupBy("STNAME").mean("RDEATH2015").show(truncate=False)

+--------------------+------------------+
|STNAME              |avg(RDEATH2015)   |
+--------------------+------------------+
|Utah                |6.065330415890001 |
|Hawaii              |6.9775830396      |
|Minnesota           |9.26586775381705  |
|Ohio                |10.028519938860676|
|Arkansas            |11.46867827963158 |
|Oregon              |9.695227234683784 |
|Texas               |8.824758353851763 |
|North Dakota        |9.470699259022224 |
|Pennsylvania        |10.701863665741179|
|Connecticut         |8.245252557299999 |
|Nebraska            |9.25641465052979  |
|Vermont             |8.814368379526666 |
|Nevada              |8.623469523611114 |
|Washington          |8.837500764262499 |
|Illinois            |10.378834192628155|
|Oklahoma            |11.15216136588205 |
|District of Columbia|7.8344584044      |
|Delaware            |9.365716533075    |
|Alaska              |6.567068817803331 |
|New Mexico          |9.006437918238234 |
+--------------------+------------

# Proviamo le potenzialità di pyspark...?

Confrontiamo il tempo di elaborazione sequenziale con uno parallelo..in locale!

In [22]:
import time

In [23]:
big_list = range(50000000) # lista di 50000000 elementi

In [24]:
# Sequenzialmente, filtriamo solamente gli elementi dispari!
start_time = time.time()
elementi_dispari = [x for x in big_list if x%2!=0]
print("Total time: ", time.time() - start_time)

Total time:  5.503225803375244


In [25]:
from pyspark.context import SparkContext

In [26]:
sc = SparkContext.getOrCreate() #re-inizializzo il contesto di spark

In [27]:
start_time = time.time()
rdd = sc.parallelize(big_list, 2)
odds = rdd.filter(lambda x: x % 2 != 0)
elementi_dispari = odds.collect()
print("Total time: ", time.time() - start_time)

Total time:  19.322924613952637
