
[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://githubtocolab.com/jkanclerz/data-science-workshop-2021/blob/main/40--spark/02--dataframes.ipynb)

In [1]:
!pip3 install pyspark



In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget https://dlcdn.apache.org/spark/spark-3.2.0/spark-3.2.0-bin-hadoop3.2.tgz -O spark-3.2.0-bin-hadoop3.2.tgz
!tar xf spark-3.2.0-bin-hadoop3.2.tgz

/usr/local/bin/bash: line 1: apt-get: command not found
--2021-12-11 07:18:10--  https://dlcdn.apache.org/spark/spark-3.2.0/spark-3.2.0-bin-hadoop3.2.tgz
Resolving dlcdn.apache.org (dlcdn.apache.org)... 151.101.2.132
Connecting to dlcdn.apache.org (dlcdn.apache.org)|151.101.2.132|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 300965906 (287M) [application/x-gzip]
Saving to: ‘spark-3.2.0-bin-hadoop3.2.tgz’


2021-12-11 07:18:19 (33.7 MB/s) - ‘spark-3.2.0-bin-hadoop3.2.tgz’ saved [300965906/300965906]



In [2]:
!pip install -q pyspark findspark

In [3]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.0-bin-hadoop3.2"

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder\
        .master("local")\
        .appName("Spark dataframe")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

In [3]:
spark

### DataFrames

DataFrames gives a schema view of data basically, it is an abstraction. In dataframes, view of data is organized as columns with column name and types info. In addition, we can say data in dataframe is as same as the table in relational database.

### DataSets

In Spark, datasets are an extension of dataframes. Basically, it earns two different APIs characteristics, such as strongly typed and untyped. Datasets are by default a collection of strongly typed JVM objects

**data frames** allow to deal with data in structured way. We will see how the data frame abstraction, very popular in other data analytics ecosystems (e.g. R and Python/Pandas), it is very powerful when performing exploratory data analysis. In fact, it is very easy to express data queries when used together with the SQL language

#### Creating a RDD from a file
The most common way of creating an RDD is to load it from a file. Notice that Spark's textFile can handle compressed files directly.

In [4]:
rdd = spark.sparkContext.parallelize([
    (1, 2., 'Jakub'),
    (2, 3., 'Michał'),
    (3, 4., 'Aga')
])


In [5]:
rdd.collect()

[(1, 2.0, 'Jakub'), (2, 3.0, 'Michał'), (3, 4.0, 'Aga')]

In [6]:
df = spark.createDataFrame(rdd, schema=['a', 'b', 'name'])

In [7]:
df

DataFrame[a: bigint, b: double, name: string]

In [8]:
df.toPandas()

Unnamed: 0,a,b,name
0,1,2.0,Jakub
1,2,3.0,Michał
2,3,4.0,Aga


In [9]:
df.show()

+---+---+------+
|  a|  b|  name|
+---+---+------+
|  1|2.0| Jakub|
|  2|3.0|Michał|
|  3|4.0|   Aga|
+---+---+------+



In [10]:
df.printSchema()

root
 |-- a: long (nullable = true)
 |-- b: double (nullable = true)
 |-- name: string (nullable = true)



In [12]:
!mkdir -p var
!wget -c https://s3.eu-central-1.amazonaws.com/jkan.pl/data-science/uek/otomoto.csv -O var/otomoto.csv

--2021-12-11 07:22:02--  https://s3.eu-central-1.amazonaws.com/jkan.pl/data-science/uek/otomoto.csv
Resolving s3.eu-central-1.amazonaws.com (s3.eu-central-1.amazonaws.com)... 52.219.171.1
Connecting to s3.eu-central-1.amazonaws.com (s3.eu-central-1.amazonaws.com)|52.219.171.1|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 20920699 (20M) [text/csv]
Saving to: ‘var/otomoto.csv’


2021-12-11 07:22:04 (18.6 MB/s) - ‘var/otomoto.csv’ saved [20920699/20920699]



In [14]:
sc = spark.sparkContext

In [15]:
raw_data = sc.textFile('otomoto.csv')
csv_data = (raw_data
            .map(lambda l: l.split(";"))
)


In [17]:
header = csv_data.first()
data = csv_data.filter(lambda r: r != header)
data.cache()

PythonRDD[13] at RDD at PythonRDD.scala:53

In [20]:
data.take(2)

[['"6039269187"',
  '"https://www.otomoto.pl/oferta/renault-fluence-bezwypadkowy-instalacja-gazowa-ID6AI9Nx.html"',
  '""',
  '"Renault"',
  '"Fluence"',
  '"Osobowe"',
  '"1 600 cm3"',
  '"5"',
  '""',
  '""',
  '"Sedan"',
  '"2013"',
  '""',
  '""',
  '""',
  '""',
  '""',
  '"Tak"',
  '""',
  '"Benzyna+LPG"',
  '""',
  '"Czechy"',
  '""',
  '"Tak"',
  '"Używane"',
  '"8 listopad 2013"',
  '"Renault Fluence"',
  '"25900.0"',
  '"Manualna"',
  '""',
  '""',
  '"4"',
  '""',
  '"172 000 km"',
  '"Tak"',
  '""',
  '""',
  '"Osoby prywatnej"',
  '""',
  '"Biały"',
  '"Na przednie koła"',
  '"110 KM"'],
 ['"6034090022"',
  '"https://www.otomoto.pl/oferta/mercedes-benz-klasa-g-350d-pakiet-amg-designo-manufaktur-edition-ID6AmqsC.html"',
  '"Tak"',
  '"Mercedes-Benz"',
  '"Klasa G"',
  '"Osobowe"',
  '"2 987 cm3"',
  '"5"',
  '"Tak"',
  '"W463 (1992-)"',
  '"SUV"',
  '"2016"',
  '"Tak"',
  '""',
  '"WDB4633481X263455"',
  '""',
  '""',
  '"Tak"',
  '"Tak"',
  '"Diesel"',
  '"261 g/km"',
  '"

In [21]:
list(enumerate(header))

[(0, '"id"'),
 (1, '"url"'),
 (2, '"Faktura VAT"'),
 (3, '"Marka pojazdu"'),
 (4, '"Model pojazdu"'),
 (5, '"Kategoria"'),
 (6, '"Pojemność skokowa"'),
 (7, '"Liczba miejsc"'),
 (8, '"Zarejestrowany w Polsce"'),
 (9, '"Wersja"'),
 (10, '"Typ"'),
 (11, '"Rok produkcji"'),
 (12, '"Serwisowany w ASO"'),
 (13, '"Leasing"'),
 (14, '"VIN"'),
 (15, '"Homologacja ciężarowa"'),
 (16, '"VAT marża"'),
 (17, '"Pierwszy właściciel"'),
 (18, '"Filtr cząstek stałych"'),
 (19, '"Rodzaj paliwa"'),
 (20, '"Emisja CO2"'),
 (21, '"Kraj pochodzenia"'),
 (22, '"Numer rejestracyjny pojazdu"'),
 (23, '"Bezwypadkowy"'),
 (24, '"Stan"'),
 (25, '"Pierwsza rejestracja"'),
 (26, '"title"'),
 (27, '"price"'),
 (28, '"Skrzynia biegów"'),
 (29, '"Perłowy"'),
 (30, '"Kierownica po prawej (Anglik)"'),
 (31, '"Liczba drzwi"'),
 (32, '"Metalik"'),
 (33, '"Przebieg"'),
 (34, '"Akryl (niemetalizowany)"'),
 (35, '"Możliwość finansowania"'),
 (36, '"Uszkodzony"'),
 (37, '"Oferta od"'),
 (38, '"Kod Silnika"'),
 (39, '"Kolor"'

In [22]:
from pyspark.sql import Row
row_data = (data
    .map(lambda p: Row(
        _id=(p[0]),
        marka=(p[3]),
        model=(p[4]),
        price=(p[27]),
        przebieg=(p[33])
    )
))

Once we have our RDD of Row we can infer and register the schema.

In [23]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)



In [24]:
DF = sqlContext.createDataFrame(row_data)

In [25]:
DF.take(1)

[Row(_id='"6039269187"', marka='"Renault"', model='"Fluence"', price='"25900.0"', przebieg='"172 000 km"')]

In [26]:
DF.select("marka", "model", "przebieg", "price")\
    .write\
    .format('csv') \
    .mode("overwrite") \
    .options(delimiter=";")\
    .save("adv_model_przebieg")

In [27]:
import pandas as pd

In [28]:
import glob
all_files = glob.glob('./adv_model_przebieg/*.csv')
df_from_each_file = (pd.read_csv(f, delimiter=";") for f in all_files)
concatenated_df   = pd.concat(df_from_each_file, ignore_index=True)

In [29]:
concatenated_df

Unnamed: 0,"\Renault\""""","\Fluence\""""","\172 000 km\""""","\25900.0\"""""
0,"\Mercedes-Benz\""""","\Klasa G\""""","\52 904 km\""""","\434900.0\"""""
1,"\Audi\""""","\A5\""""","\56 000 km\""""","\41900.0\"""""
2,"\Volkswagen\""""","\Polo\""""","\88 000 km\""""","\14900.0\"""""
3,"\Peugeot\""""","\206 CC\""""","\176 000 km\""""","\5900.0\"""""
4,"\Volkswagen\""""","\Bora\""""","\300 000 km\""""","\7900.0\"""""
...,...,...,...,...
51180,"\Mazda\""""","\6\""""","\160 700 km\""""","\13555.0\"""""
51181,"\Land Rover\""""","\Range Rover Sport\""""","\128 486 km\""""","\136000.0\"""""
51182,"\Hyundai\""""","\ix20\""""","\5 km\""""","\54900.0\"""""
51183,"\Ford\""""","\Fiesta\""""","\139 000 km\""""","\15900.0\"""""


https://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html

In [30]:
DF = spark.read.load("var/otomoto.csv", format="csv", sep=";", inferSchema="true", header="true")

[Stage 12:>                                                         (0 + 1) / 1]                                                                                

In [31]:
DF.take(1)

21/12/11 07:26:26 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


[Row(id=6039269187, url='https://www.otomoto.pl/oferta/renault-fluence-bezwypadkowy-instalacja-gazowa-ID6AI9Nx.html', Faktura VAT=None, Marka pojazdu='Renault', Model pojazdu='Fluence', Kategoria='Osobowe', Pojemność skokowa='1 600 cm3', Liczba miejsc=5, Zarejestrowany w Polsce=None, Wersja=None, Typ='Sedan', Rok produkcji=2013, Serwisowany w ASO=None, Leasing=None, VIN=None, Homologacja ciężarowa=None, VAT marża=None, Pierwszy właściciel='Tak', Filtr cząstek stałych=None, Rodzaj paliwa='Benzyna+LPG', Emisja CO2=None, Kraj pochodzenia='Czechy', Numer rejestracyjny pojazdu=None, Bezwypadkowy='Tak', Stan='Używane', Pierwsza rejestracja='8 listopad 2013', title='Renault Fluence', price=25900.0, Skrzynia biegów='Manualna', Perłowy=None, Kierownica po prawej (Anglik)=None, Liczba drzwi=4, Metalik=None, Przebieg='172 000 km', Akryl (niemetalizowany)='Tak', Możliwość finansowania=None, Uszkodzony=None, Oferta od='Osoby prywatnej', Kod Silnika=None, Kolor='Biały', Napęd='Na przednie koła', Moc

In [32]:
type(DF)

pyspark.sql.dataframe.DataFrame

In [33]:
DF.registerTempTable("cars")



In [34]:
volvos = sqlContext.sql("""
    Select `Marka pojazdu` as marka, `Model pojazdu` as model, `price`, `przebieg`, `Rok produkcji` as rok  from cars 
    WHERE `Marka pojazdu` = "Volvo"
    AND `Model pojazdu` = "XC 60"
""")

In [35]:
type(volvos)

pyspark.sql.dataframe.DataFrame

The results of SQL queries are RDDs and support all the normal RDD operations.

In [36]:
volvos.take(1)

[Row(marka='Volvo', model='XC 60', price=89900.0, przebieg='132 789 km', rok=2016)]

In [37]:
volvos.show()

+-----+-----+--------+----------+----+
|marka|model|   price|  przebieg| rok|
+-----+-----+--------+----------+----+
|Volvo|XC 60| 89900.0|132 789 km|2016|
|Volvo|XC 60|121951.0| 25 236 km|2016|
|Volvo|XC 60| 54900.0|221 000 km|2012|
|Volvo|XC 60| 79999.0|140 515 km|2012|
|Volvo|XC 60| 89900.0|159 900 km|2012|
|Volvo|XC 60| 98900.0| 22 739 km|2016|
|Volvo|XC 60| 79555.0| 62 321 km|2012|
|Volvo|XC 60| 52900.0| 60 000 km|2014|
|Volvo|XC 60| 66900.0| 15 000 km|2015|
|Volvo|XC 60| 59900.0|183 000 km|2010|
|Volvo|XC 60|112800.0| 32 000 km|2016|
|Volvo|XC 60|123000.0| 21 000 km|2016|
|Volvo|XC 60| 87000.0|144 000 km|2015|
|Volvo|XC 60| 88900.0| 66 200 km|2014|
|Volvo|XC 60|185900.0|  3 000 km|2017|
|Volvo|XC 60| 75900.0|132 000 km|2014|
|Volvo|XC 60| 43500.0|205 500 km|2010|
|Volvo|XC 60| 68000.0|110 094 km|2014|
|Volvo|XC 60|122900.0| 13 000 km|2017|
|Volvo|XC 60| 79900.0|145 000 km|2016|
+-----+-----+--------+----------+----+
only showing top 20 rows



In [38]:
for volvo in volvos.collect():
    print("{} for {} with millage {}".format(volvo.model, volvo.price, volvo.przebieg))

XC 60 for 89900.0 with millage 132 789 km
XC 60 for 121951.0 with millage 25 236 km
XC 60 for 54900.0 with millage 221 000 km
XC 60 for 79999.0 with millage 140 515 km
XC 60 for 89900.0 with millage 159 900 km
XC 60 for 98900.0 with millage 22 739 km
XC 60 for 79555.0 with millage 62 321 km
XC 60 for 52900.0 with millage 60 000 km
XC 60 for 66900.0 with millage 15 000 km
XC 60 for 59900.0 with millage 183 000 km
XC 60 for 112800.0 with millage 32 000 km
XC 60 for 123000.0 with millage 21 000 km
XC 60 for 87000.0 with millage 144 000 km
XC 60 for 88900.0 with millage 66 200 km
XC 60 for 185900.0 with millage 3 000 km
XC 60 for 75900.0 with millage 132 000 km
XC 60 for 43500.0 with millage 205 500 km
XC 60 for 68000.0 with millage 110 094 km
XC 60 for 122900.0 with millage 13 000 km
XC 60 for 79900.0 with millage 145 000 km
XC 60 for 94900.0 with millage 124 999 km
XC 60 for 54000.0 with millage 290 000 km
XC 60 for 71900.0 with millage 97 100 km
XC 60 for 87500.0 with millage 163 000 km

In [39]:
DF.printSchema()

root
 |-- id: long (nullable = true)
 |-- url: string (nullable = true)
 |-- Faktura VAT: string (nullable = true)
 |-- Marka pojazdu: string (nullable = true)
 |-- Model pojazdu: string (nullable = true)
 |-- Kategoria: string (nullable = true)
 |-- Pojemność skokowa: string (nullable = true)
 |-- Liczba miejsc: integer (nullable = true)
 |-- Zarejestrowany w Polsce: string (nullable = true)
 |-- Wersja: string (nullable = true)
 |-- Typ: string (nullable = true)
 |-- Rok produkcji: integer (nullable = true)
 |-- Serwisowany w ASO: string (nullable = true)
 |-- Leasing: string (nullable = true)
 |-- VIN: string (nullable = true)
 |-- Homologacja ciężarowa: string (nullable = true)
 |-- VAT marża: string (nullable = true)
 |-- Pierwszy właściciel: string (nullable = true)
 |-- Filtr cząstek stałych: string (nullable = true)
 |-- Rodzaj paliwa: string (nullable = true)
 |-- Emisja CO2: string (nullable = true)
 |-- Kraj pochodzenia: string (nullable = true)
 |-- Numer rejestracyjny poja

### Queries as DataFrame operations


In [40]:
(DF
     .select("Marka pojazdu")
     .groupBy("Marka pojazdu")
     .count()
     .show()
)

+-------------+-----+
|Marka pojazdu|count|
+-------------+-----+
|   Volkswagen| 5598|
|   Oldsmobile|    1|
|     Infiniti|   91|
|      Peugeot| 1906|
|          NSU|    1|
|        Lexus|  252|
|       Jaguar|  215|
|       Saturn|    1|
|          Żuk|    1|
|     Maserati|   43|
|        Rover|   32|
|        Aixam|   37|
|         Tata|    2|
|         Jeep|  426|
|       Lancia|   64|
|   Mitsubishi|  533|
|         Mini|  495|
|          Kia| 1061|
|    Chevrolet|  352|
|      Piaggio|    1|
+-------------+-----+
only showing top 20 rows



In [41]:
DF.select(DF["Model pojazdu"]) \
    .filter(DF["Marka pojazdu"]=="Volvo") \
    .groupBy("Model pojazdu") \
    .count().show()

+-------------+-----+
|Model pojazdu|count|
+-------------+-----+
|          S60|  130|
|          V90|   31|
|          850|    4|
|    Seria 400|    1|
|          V60|  112|
|          S80|   59|
|          745|    1|
|          S90|   48|
|          C30|   45|
|          V70|   74|
|        XC 40|   10|
|          V50|  147|
|          S40|   73|
|        XC 70|   37|
|    Seria 700|    1|
|        XC 60|  219|
|         Inny|    1|
|          C70|   10|
|    Seria 200|    1|
|          V40|  151|
+-------------+-----+
only showing top 20 rows

