# Wordcount example

This notebook shows the classic wordcount example in which we want to calculate how many time the same word appears within a text.
The example also show how to use basic PySpark tools like DataFrame and SQL.

In [1]:
# To find out where the pyspark
import findspark
findspark.init()

In [2]:
# Creating Spark Context
from pyspark import SparkContext
sc = SparkContext("local", "Wordcount")

With the step below we are going to read an input (local) file that will be our data source. textFile() and wholeTextFiles() methods to read into RDD that are the low level data access of Spark (there exist other method to read directly in Dataframe).

Each line of the text file is a *row*. We can apply a series of chained operation:
1. flatMap produces a new dataset <word> from the splitting
2. map produces a new dataset in the form <word, couunt>
3. reduceByKey coordinates the aggregation by summing rows with the same key

In [3]:
# Calculating words count
text_file = sc.textFile("doveconviene_info.txt")
counts = text_file.flatMap(lambda line: line.split(" ")) \
             .map(lambda word: (word, 1)) \
             .reduceByKey(lambda a, b: a + b)

We now print some basec statistics about words and their occurrences

In [4]:
# Printing each word with its respective count
output = counts.collect()
for (word, occurs) in output:
    print("%s: %i" % (word, occurs))

brand-logo: 2
Homepage: 2
Chi: 5
siamo: 5
Soluzioni: 2
Lavora: 4
con: 11
noi: 4
Contatti: 2
Newsroom: 2
Scopri: 5
le: 9
offerte: 5
Lo: 2
shopping: 6
è: 4
la: 11
nostra: 4
passione.: 2
Vogliamo: 2
arricchire: 2
questa: 2
esperienza: 2
dalla: 2
fase: 2
di: 30
preparazione: 2
su: 3
mobile: 3
fino: 4
al: 5
momento: 2
della: 2
visita: 2
in: 21
negozio.: 5
: 49
Shopping: 1
Experts: 1
a: 7
partire: 1
Ci: 1
impegniamo: 1
fondo: 1
per: 7
offrire: 1
ai: 3
nostri: 3
shopper: 1
sempre: 2
migliore: 3
esperienza:: 1
semplice,: 1
efficace: 2
e: 12
divertente.: 1
Collaboriamo: 2
numerosi: 1
retailers: 4
manufacturers: 2
fine: 1
condurre: 1
più: 7
gli: 2
shoppers: 4
Tech: 1
Lovers: 1
Crediamo: 1
che: 3
tecnologia: 1
possa: 1
realmente: 1
supportare: 2
i: 6
nel: 3
guidare: 1
il: 9
processo: 1
trasformazione: 2
atto: 1
delle: 3
modalità: 1
cui: 3
consumatori: 1
affrontano: 1
lo: 2
shopping.: 1
La: 3
piattaforma,: 1
grazie: 2
big: 1
data: 1
all’uso: 1
del: 6
machine: 1
learning,: 1
cerca: 1
ogni: 1
giorno

## Dataframe in PySpark

PySpark is Python on Spark, programming language and syntax are the ones that we already know.

*Dataframe* is the approach to analysis via high-level data structures that work in a distributed way. DataFrames is a data structure already known in Python/R that allow you to have a tabular/columnar form of the data. Operations are simple to execute and runs in parallel.

Convert to a friendly structure: dataframe
https://towardsdatascience.com/pyspark-and-sparksql-basics-6cb4bf967e53

In [5]:
from pyspark.sql import SparkSession
spark = SparkSession(sc)

In [6]:
from pyspark.sql import Row # import the pyspark sql Row class

rows = counts.map(lambda p: Row(word=p[0], occurs=int(p[1]))) # tuples -> Rows
#rows.toDF().createOrReplaceTempView("word_count")

In [7]:
df = rows.toDF()

In [8]:
df.printSchema()

root
 |-- occurs: long (nullable = true)
 |-- word: string (nullable = true)



In [9]:
df.head(5)

[Row(occurs=2, word='brand-logo'),
 Row(occurs=2, word='Homepage'),
 Row(occurs=5, word='Chi'),
 Row(occurs=5, word='siamo'),
 Row(occurs=2, word='Soluzioni')]

In [11]:
df.show(10,truncate= True) 
# https://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame.show

+------+----------+
|occurs|      word|
+------+----------+
|     2|brand-logo|
|     2|  Homepage|
|     5|       Chi|
|     5|     siamo|
|     2| Soluzioni|
|     4|    Lavora|
|    11|       con|
|     4|       noi|
|     2|  Contatti|
|     2|  Newsroom|
+------+----------+
only showing top 10 rows



In [12]:
df.count()

491

In [13]:
len(df.columns), df.columns


(2, ['occurs', 'word'])

In [14]:
df.describe().show()

+-------+------------------+------------------+
|summary|            occurs|              word|
+-------+------------------+------------------+
|  count|               491|               491|
|   mean|1.8431771894093687|1265.2222222222222|
| stddev| 3.023352863614764| 1690.425965384005|
|    min|                 1|                  |
|    max|                49|              🇲🇽|
+-------+------------------+------------------+



In [15]:
df.describe('occurs').show()

+-------+------------------+
|summary|            occurs|
+-------+------------------+
|  count|               491|
|   mean|1.8431771894093687|
| stddev| 3.023352863614764|
|    min|                 1|
|    max|                49|
+-------+------------------+



In [16]:
df.select('occurs','word').show(5)


+------+----------+
|occurs|      word|
+------+----------+
|     2|brand-logo|
|     2|  Homepage|
|     5|       Chi|
|     5|     siamo|
|     2| Soluzioni|
+------+----------+
only showing top 5 rows



In [17]:
df.select('word').show(5)

+----------+
|      word|
+----------+
|brand-logo|
|  Homepage|
|       Chi|
|     siamo|
| Soluzioni|
+----------+
only showing top 5 rows



In [18]:
df.select('word').distinct().count(), df.select('word').count(),df.select('word').dropDuplicates().count()

(491, 491, 491)

In [20]:
df.orderBy(df['occurs'].desc()).show(500)

+------+----------------+
|occurs|            word|
+------+----------------+
|    49|                |
|    30|              di|
|    21|              in|
|    12|               e|
|    11|             con|
|    11|              la|
|     9|              il|
|     9|              le|
|     8|               &|
|     8|               I|
|     8|      user-photo|
|     7|               a|
|     7|             per|
|     7|             più|
|     7|         milioni|
|     6|             del|
|     6|           stars|
|     6|        shopping|
|     6|       slideshow|
|     6|               i|
|     6|               —|
|     5|          Scopri|
|     5|           siamo|
|     5|         offerte|
|     5|              al|
|     5|        negozio.|
|     5|            2020|
|     5|             Chi|
|     4|       retailers|
|     4|            fino|
|     4|        shoppers|
|     4|          negozi|
|     4|          nostro|
|     4|              to|
|     4|           Board|
|     4|    

## Moving to SQL

We can also use SQL syntax to analyze Dataframe.We first need to define a virtual SQL table from the dataframe. 


In [21]:
 word_count = df.createOrReplaceTempView("word_count")

In [23]:
spark.sql('select * from word_count order by occurs DESC ').show(5)


+------+----+
|occurs|word|
+------+----+
|    49|    |
|    30|  di|
|    21|  in|
|    12|   e|
|    11| con|
+------+----+
only showing top 5 rows



In [24]:
spark.sql("""
select                
* 
from word_count
WHERE length(word) > 2
ORDER BY occurs  DESC
""").show(5)

+------+----------+
|occurs|      word|
+------+----------+
|    11|       con|
|     8|user-photo|
|     7|       per|
|     7|       più|
|     7|   milioni|
+------+----------+
only showing top 5 rows



In [25]:
# Finally, save our result
df.write.mode('overwrite').option("header", "true").save('wordcount.csv',format='csv')

In [None]:
# Stopping Spark Context
sc.stop()

## Consideration about Dataframe and SQL

Dataframes are interchangeable with SQL. We can register a DataFrame as a SQL table and query it via SQL syntax.

Benefits? High level access to the dataset. DataFrame as a SQL have similar performances. RDD (low level access) is fastest than Dataframe and SQL.
More info here: 
https://community.cloudera.com/t5/Community-Articles/Spark-RDDs-vs-DataFrames-vs-SparkSQL/ta-p/246547
https://stackoverflow.com/questions/45430816/writing-sql-vs-using-dataframe-apis-in-spark-sql


## What's next?

### Dataframe

Good operation overview.
https://towardsdatascience.com/pyspark-and-sparksql-basics-6cb4bf967e53

The basic operations on Dataframe are described in terms operation requirements.
https://www.analyticsvidhya.com/blog/2016/10/spark-dataframe-and-operations/

### SQL
SparkSql follows Hive style, so you can refer to Hive Syntax for documentation.
https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Select

The supported and unsupported Hive features by SparkSql can be found in the official documentation.
https://spark.apache.org/docs/latest/sql-programming-guide.html#compatibility-with-apache-hive

### SparkExamples

Below a reference site for a good hoverview of basic Spark operations (cluster setup, RDD operations, Dataframe operations, SQL operations, Streaming, integration with other frameworks).
It is for Scala but it will be easy get feedback also for PySpark.
https://sparkbyexamples.com/![image.png](attachment:image.png)
