# First contact with df

#### Batch data

Data can be loaded in through a CSV, JSON, XML, or a Parquet file. 

It can also be created using an existing RDD and through any other database, like Hive or Cassandra as well. 

It can also take in data from HDFS or the local file system.

In [1]:
import findspark
import pyspark
import time
import operator
from pyspark import SparkConf
from pyspark import SparkContext
import pandas as pd


conf = SparkConf()
conf.setMaster("local")
conf.setAppName("spark-basic")
sc = SparkContext(conf = conf)

A first example with 'fake' data:

In [2]:
data1 = {'PassengerId': {0: 1, 1: 2, 2: 3, 3: 4, 4: 5, 5: 6 },
         'Name': {0: 'Owen', 1: 'Florence', 2: 'Laina', 3: 'Lily', 4: 'William', 5: 'Jack'},
         'Sex': {0: 'male', 1: 'female', 2: 'female', 3: 'female', 4: 'male', 5: 'male'},
         'Survived': {0: 0, 1: 1, 2: 1, 3: 1, 4: 0, 5: 0}}

data2 = {'Id': {0: 1, 1: 2, 2: 3, 3: 4, 4: 5},
         'Age': {0: 22, 1: 38, 2: 26, 3: 35, 4: 35},
         'Fare': {0: 7.3, 1: 71.3, 2: 7.9, 3: 53.1, 4: 8.0},
         'Pclass': {0: 3, 1: 1, 2: 3, 3: 1, 4: 3}}

df1_pd = pd.DataFrame(data1, columns=data1.keys())
df2_pd = pd.DataFrame(data2, columns=data2.keys())


We will need a spark session to process this type of data. Caution, many examples on the Internet do not specify what "spark." is

In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

df1 = spark.createDataFrame(df1_pd)
df2 = spark.createDataFrame(df2_pd)
df1.show()

+-----------+--------+------+--------+
|PassengerId|    Name|   Sex|Survived|
+-----------+--------+------+--------+
|          1|    Owen|  male|       0|
|          2|Florence|female|       1|
|          3|   Laina|female|       1|
|          4|    Lily|female|       1|
|          5| William|  male|       0|
|          6|    Jack|  male|       0|
+-----------+--------+------+--------+



Summary of the properties of our object

In [4]:
df1.printSchema()

root
 |-- PassengerId: long (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Survived: long (nullable = true)



In [5]:
type(df1)

pyspark.sql.dataframe.DataFrame

### Operating with dataframes

If you have experience coding with the package dplyr from R, you have more than half of it covered!

**Filter**, **select**, **mutate**, **summarize** and **arrange** are the basic verbs here too, although some of them have different names...

In [13]:
df1.select(["Name","Sex"]).show()

+--------+------+
|    Name|   Sex|
+--------+------+
|    Owen|  male|
|Florence|female|
|   Laina|female|
|    Lily|female|
| William|  male|
|    Jack|  male|
+--------+------+



In [6]:
df1.filter(df1.Sex == 'female').filter(df1.Survived == 1).show() 

+-----------+--------+------+--------+
|PassengerId|    Name|   Sex|Survived|
+-----------+--------+------+--------+
|          2|Florence|female|       1|
|          3|   Laina|female|       1|
|          4|    Lily|female|       1|
+-----------+--------+------+--------+



But we can also do it SQL-style. Note that this is not a python syntax!

In [7]:
df1.filter("Sex='female'").show()

+-----------+--------+------+--------+
|PassengerId|    Name|   Sex|Survived|
+-----------+--------+------+--------+
|          2|Florence|female|       1|
|          3|   Laina|female|       1|
|          4|    Lily|female|       1|
+-----------+--------+------+--------+



And this is the equivalent of mutate....

In [8]:
hello = df2.withColumn('PricePerYear', df2.Fare/df2.Age)

In [9]:
hello

DataFrame[Id: bigint, Age: bigint, Fare: double, Pclass: bigint, PricePerYear: double]

Groupby is needed when we want to summarize our data... the use is identical than in lists or dictionaries

In [10]:
df2.groupby('Pclass').avg(*["Age","Fare"]).show()

+------+------------------+-----------------+
|Pclass|          avg(Age)|        avg(Fare)|
+------+------------------+-----------------+
|     1|              36.5|             62.2|
|     3|27.666666666666668|7.733333333333333|
+------+------------------+-----------------+



In [11]:
df2.groupby('Pclass').agg({'Age': 'avg', 'Fare': 'avg'}).show()

+------+------------------+-----------------+
|Pclass|          avg(Age)|        avg(Fare)|
+------+------------------+-----------------+
|     1|              36.5|             62.2|
|     3|27.666666666666668|7.733333333333333|
+------+------------------+-----------------+



If you are not familiar with the syntax *****, it refers to "unpack", so we have to provide an unpacked list.

If we want multiple functions at the same time, we need an dictionary

In [12]:
df2.groupby('Pclass').agg({'*': 'count', 'Age': 'avg', 'Fare': 'sum'}).show()

+------+--------+------------------+---------+
|Pclass|count(1)|          avg(Age)|sum(Fare)|
+------+--------+------------------+---------+
|     1|       2|              36.5|    124.4|
|     3|       3|27.666666666666668|     23.2|
+------+--------+------------------+---------+



To present the summarized data in a nicer format, we can un toDF() to rename the columns

In [13]:
  (
    df2.groupby('Pclass').agg({'*': 'count', 'Age': 'avg', 'Fare':'sum'})
    .toDF('Pclass', 'counts', 'average_age', 'total_fare')
    .show()
  )

+------+------+------------------+----------+
|Pclass|counts|       average_age|total_fare|
+------+------+------------------+----------+
|     1|     2|              36.5|     124.4|
|     3|     3|27.666666666666668|      23.2|
+------+------+------------------+----------+



And finally, we can **arrange** dataframes using the method sort

In [14]:
df2.sort('Age', ascending=True).show()

+---+---+----+------+
| Id|Age|Fare|Pclass|
+---+---+----+------+
|  1| 22| 7.3|     3|
|  3| 26| 7.9|     3|
|  4| 35|53.1|     1|
|  5| 35| 8.0|     3|
|  2| 38|71.3|     1|
+---+---+----+------+



Good reference: https://dzone.com/articles/pyspark-dataframe-tutorial-introduction-to-datafra 

## Basic joins and unions

In [21]:
df1.join(df2, df2.Id == df1.PassengerId).show()

+-----------+--------+------+--------+---+---+----+------+
|PassengerId|    Name|   Sex|Survived| Id|Age|Fare|Pclass|
+-----------+--------+------+--------+---+---+----+------+
|          5| William|  male|       0|  5| 35| 8.0|     3|
|          1|    Owen|  male|       0|  1| 22| 7.3|     3|
|          3|   Laina|female|       1|  3| 26| 7.9|     3|
|          2|Florence|female|       1|  2| 38|71.3|     1|
|          4|    Lily|female|       1|  4| 35|53.1|     1|
+-----------+--------+------+--------+---+---+----+------+



In [22]:
df1.show()

+-----------+--------+------+--------+
|PassengerId|    Name|   Sex|Survived|
+-----------+--------+------+--------+
|          1|    Owen|  male|       0|
|          2|Florence|female|       1|
|          3|   Laina|female|       1|
|          4|    Lily|female|       1|
|          5| William|  male|       0|
|          6|    Jack|  male|       0|
+-----------+--------+------+--------+



Is it symmetric? Where is Jack? :'(

If you have some time, explore with other types of join, such as "left", "right", "cross", "inner"

In [None]:
#### Your code here

In [17]:
print((df1.count(), len(df1.columns)))
print((df2.count(), len(df2.columns)))

(6, 4)
(5, 4)


In [14]:
df2.show()

+---+---+----+------+
| Id|Age|Fare|Pclass|
+---+---+----+------+
|  1| 22| 7.3|     3|
|  2| 38|71.3|     1|
|  3| 26| 7.9|     3|
|  4| 35|53.1|     1|
|  5| 35| 8.0|     3|
+---+---+----+------+



In [22]:
left_join = df1.join(df2, df2.Id == df1.PassengerId, how='left')
left_join.show()

+-----------+--------+------+--------+----+----+----+------+
|PassengerId|    Name|   Sex|Survived|  Id| Age|Fare|Pclass|
+-----------+--------+------+--------+----+----+----+------+
|          6|    Jack|  male|       0|null|null|null|  null|
|          5| William|  male|       0|   5|  35| 8.0|     3|
|          1|    Owen|  male|       0|   1|  22| 7.3|     3|
|          3|   Laina|female|       1|   3|  26| 7.9|     3|
|          2|Florence|female|       1|   2|  38|71.3|     1|
|          4|    Lily|female|       1|   4|  35|53.1|     1|
+-----------+--------+------+--------+----+----+----+------+



In [20]:
left_join.filter(col('df2.Id').isNull()).show()

NameError: name 'col' is not defined

And union doesn't do what we could expect... why would we need it?

In [23]:
df1.union(df2).show()

+-----------+--------+------+--------+
|PassengerId|    Name|   Sex|Survived|
+-----------+--------+------+--------+
|          1|    Owen|  male|       0|
|          2|Florence|female|       1|
|          3|   Laina|female|       1|
|          4|    Lily|female|       1|
|          5| William|  male|       0|
|          6|    Jack|  male|       0|
|          1|      22|   7.3|       3|
|          2|      38|  71.3|       1|
|          3|      26|   7.9|       3|
|          4|      35|  53.1|       1|
|          5|      35|   8.0|       3|
+-----------+--------+------+--------+



Some weird join procedure... what is going on here? Are you able to find why would it be useful? (and dangerous?)

In [25]:
df1.join(df2, df1.PassengerId <= df2.Id).show()

+-----------+--------+------+--------+---+---+----+------+
|PassengerId|    Name|   Sex|Survived| Id|Age|Fare|Pclass|
+-----------+--------+------+--------+---+---+----+------+
|          1|    Owen|  male|       0|  1| 22| 7.3|     3|
|          1|    Owen|  male|       0|  2| 38|71.3|     1|
|          1|    Owen|  male|       0|  3| 26| 7.9|     3|
|          1|    Owen|  male|       0|  4| 35|53.1|     1|
|          1|    Owen|  male|       0|  5| 35| 8.0|     3|
|          2|Florence|female|       1|  2| 38|71.3|     1|
|          2|Florence|female|       1|  3| 26| 7.9|     3|
|          2|Florence|female|       1|  4| 35|53.1|     1|
|          2|Florence|female|       1|  5| 35| 8.0|     3|
|          3|   Laina|female|       1|  3| 26| 7.9|     3|
|          3|   Laina|female|       1|  4| 35|53.1|     1|
|          3|   Laina|female|       1|  5| 35| 8.0|     3|
|          4|    Lily|female|       1|  4| 35|53.1|     1|
|          4|    Lily|female|       1|  5| 35| 8.0|     

And... what about doing joins with SQL?

If we want to do that, we will need temporary Views! Sounds worse than it actually is

In [27]:
df1.createOrReplaceTempView('df1_temp')
df2.createOrReplaceTempView('df2_temp')

query = '''
    select *
    from df1_temp a, df2_temp b
    where a.PassengerId = b.Id'''

spark.sql(query).show()

+-----------+--------+------+--------+---+---+----+------+
|PassengerId|    Name|   Sex|Survived| Id|Age|Fare|Pclass|
+-----------+--------+------+--------+---+---+----+------+
|          5| William|  male|       0|  5| 35| 8.0|     3|
|          1|    Owen|  male|       0|  1| 22| 7.3|     3|
|          3|   Laina|female|       1|  3| 26| 7.9|     3|
|          2|Florence|female|       1|  2| 38|71.3|     1|
|          4|    Lily|female|       1|  4| 35|53.1|     1|
+-----------+--------+------+--------+---+---+----+------+



In [23]:
from pyspark.sql.functions import round, col

resumen = df2.groupby('Pclass').avg("Age").toDF("Clase","Media")
resumen.select("*",round(col("Media"),2)).show()

+-----+------------------+---------------+
|Clase|             Media|round(Media, 2)|
+-----+------------------+---------------+
|    1|              36.5|           36.5|
|    3|27.666666666666668|          27.67|
+-----+------------------+---------------+



In [24]:
resumen = df2.groupby('Pclass').avg("Age").toDF("Clase","Media")
resumen.select("*",round(resumen["Media"],2)).show()

+-----+------------------+---------------+
|Clase|             Media|round(Media, 2)|
+-----+------------------+---------------+
|    1|              36.5|           36.5|
|    3|27.666666666666668|          27.67|
+-----+------------------+---------------+



## Importing and exporting data

#### Indirectly

In [28]:
!pip3 install openpyxl

Collecting openpyxl
  Downloading openpyxl-3.0.7-py2.py3-none-any.whl (243 kB)
[K     |████████████████████████████████| 243 kB 158 kB/s eta 0:00:01
[?25hCollecting et-xmlfile
  Downloading et_xmlfile-1.0.1.tar.gz (8.4 kB)
Building wheels for collected packages: et-xmlfile
  Building wheel for et-xmlfile (setup.py) ... [?25ldone
[?25h  Created wheel for et-xmlfile: filename=et_xmlfile-1.0.1-py3-none-any.whl size=8913 sha256=3ea116e102784fa6df64317fdd96757c9f5568f8225a5470b64276d09b0352a8
  Stored in directory: /home/jovyan/.cache/pip/wheels/6e/df/38/abda47b884e3e25f9f9b6430e5ce44c47670758a50c0c51759
Successfully built et-xmlfile
Installing collected packages: et-xmlfile, openpyxl
Successfully installed et-xmlfile-1.0.1 openpyxl-3.0.7


In [6]:
prod = pd.read_excel("FoodMarket.xlsx",sheet_name="Products")
cust = pd.read_excel("FoodMarket.xlsx",sheet_name="Customers")
purc = pd.read_excel("FoodMarket.xlsx",sheet_name="Purchases")
sell = pd.read_excel("FoodMarket.xlsx",sheet_name="Sellers")

In [7]:
type(prod)

pandas.core.frame.DataFrame

In [8]:
prod = spark.createDataFrame(prod)
cust = spark.createDataFrame(cust)
purc = spark.createDataFrame(purc)
sell = spark.createDataFrame(sell)

In [9]:
purc.show()

+----+-----+-------+--------+------+
|Code|Buyer|Product|Quantity|Seller|
+----+-----+-------+--------+------+
|   1|  139|     14|       7|    33|
|   2|  127|     51|       3|    17|
|   3|  137|     10|      15|    22|
|   4|   58|     54|       3|    30|
|   5|  196|     57|       8|    11|
|   6|  135|     19|       6|     3|
|   7|   28|     59|      19|    10|
|   8|  193|     60|      18|    18|
|   9|   68|      5|       3|    33|
|  10|   30|     51|      12|     3|
|  11|   80|      8|       9|    29|
|  12|   90|     12|       3|    12|
|  13|   72|     30|       7|    38|
|  14|  174|     53|      11|    15|
|  15|  167|     35|      11|    45|
|  16|  125|     13|      16|    21|
|  17|  120|     24|       1|    20|
|  18|  141|     28|      16|    13|
|  19|  198|     35|      11|    49|
|  20|   42|     32|       6|    52|
+----+-----+-------+--------+------+
only showing top 20 rows



#### Directly

In [None]:
sc.

In [25]:
spark.read.csv("C:/Users/joangj/OneDrive - NETMIND/BTS/2020/data/FoodMarket.csv",sep=";",header = True).show()

Py4JJavaError: An error occurred while calling o169.csv.
: org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme "C"
	at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3281)
	at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3301)
	at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:124)
	at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3352)
	at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3320)
	at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:479)
	at org.apache.hadoop.fs.Path.getFileSystem(Path.java:361)
	at org.apache.spark.sql.execution.streaming.FileStreamSink$.hasMetadata(FileStreamSink.scala:46)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:376)
	at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:326)
	at org.apache.spark.sql.DataFrameReader.$anonfun$load$3(DataFrameReader.scala:308)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:308)
	at org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:796)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:834)


How would you solve the problem of the decimal format?

#### Writing the file back again

An easy way, but generates a new folder and extra files...

In [42]:
prod.write.csv("Newdata",header = True) 

AnalysisException: path file:/C:/Users/joangj/OneDrive - NETMIND/BTS/2020/RealTime/Newdata already exists.;

Using pandas...

In [43]:
prod.toPandas().to_csv('mycsv.csv')

Let's see how many partitions do we have in our data!

In [44]:
purc.rdd.getNumPartitions() 

1

In [39]:
new_df = purc.groupBy("Seller").count()

In [45]:
new_df.show()

+------+-----+
|Seller|count|
+------+-----+
|    29|   32|
|    26|   25|
|    19|   28|
|    54|   29|
|    22|   37|
|     7|   34|
|    34|   23|
|    50|   29|
|    57|   18|
|    32|   18|
|    43|   28|
|    31|   26|
|    39|   39|
|    25|   33|
|     6|   17|
|    58|   20|
|     9|   21|
|    27|   20|
|    56|   25|
|    51|   18|
+------+-----+
only showing top 20 rows



In [41]:
new_df.rdd.getNumPartitions() 

200

As you can see the partition number suddenly increases. This is due to the fact that the Spark SQL module contains the following default configuration: spark.sql.shuffle.partitions set to 200.

Depending on your use case, this can be benefitial or harmfull. In this particular case, one can see that the data volume is not enough to fill all the partitions when there are 200 of them, which causes unnecessary map reduce operations, and ultimately causes the creation of very small files in HDFS, which is not desirable.

If we want to change this parameter (and many others) we can do it through the SQLContext...

In [46]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
sqlContext.setConf("spark.sql.shuffle.partitions", "10")