![](imgs/kodolamaczlogo.png)

# Przetwarzanie Big Data z użyciem Apache Spark

Autor notebooka: Jakub Nowacki.

## Spark 2.0

Spark 2.0 wprowadził wiele optymalizacji związanych z prędkością działania; zobacz [wpis na blogu Databricks](https://databricks.com/blog/2016/07/26/introducing-apache-spark-2-0.html). 

Uporządkowane zostało równierz nieco API. Historycznie było wiele różnych obiektów do sterowania zadaniem: `SparkContext`, `SQLContext` czy `HiveContext`. W wersji Spark 2.0 zostały one wszystkie sprowadzone do obiektu `SparkSession`. Teraz zadania uruchamia się następująco:

In [1]:
import pyspark
import pyspark.sql.functions as func

spark = pyspark.sql.SparkSession.builder \
    .appName('spark_2_test') \
    .master('local[2]') \
    .enableHiveSupport() \
    .getOrCreate()

In [2]:
lines = spark.read.text('data/titus_andronicus.txt')
lines.printSchema()
lines.show()
lines

root
 |-- value: string (nullable = true)

+--------------------+
|               value|
+--------------------+
|                    |
|This Etext file i...|
|cooperation with ...|
|Future and Shakes...|
|Etexts that are N...|
|                    |
|*This Etext has c...|
|                    |
|<<THIS ELECTRONIC...|
|SHAKESPEARE IS CO...|
|PROVIDED BY PROJE...|
|MACHINE READABLE ...|
|(1) ARE FOR YOUR ...|
|DISTRIBUTED OR US...|
|DISTRIBUTION INCL...|
|TIME OR FOR MEMBE...|
|                    |
|*Project Gutenber...|
|in the presentati...|
|for your reading ...|
+--------------------+
only showing top 20 rows



DataFrame[value: string]

In [6]:
words = lines.select(func.split(lines.value, '\W+').alias('words')) 
words.show()
words.printSchema()

+--------------------+
|               words|
+--------------------+
|                  []|
|[This, Etext, fil...|
|[cooperation, wit...|
|[Future, and, Sha...|
|[Etexts, that, ar...|
|                  []|
|[, This, Etext, h...|
|                  []|
|[, THIS, ELECTRON...|
|[SHAKESPEARE, IS,...|
|[PROVIDED, BY, PR...|
|[MACHINE, READABL...|
|[, 1, ARE, FOR, Y...|
|[DISTRIBUTED, OR,...|
|[DISTRIBUTION, IN...|
|[TIME, OR, FOR, M...|
|                  []|
|[, Project, Guten...|
|[in, the, present...|
|[for, your, readi...|
+--------------------+
only showing top 20 rows

root
 |-- words: array (nullable = true)
 |    |-- element: string (containsNull = true)



In [7]:
words = lines.select(func.explode(func.split(lines.value, '\W+')).alias('words')) 
words.show()

+-----------+
|      words|
+-----------+
|           |
|       This|
|      Etext|
|       file|
|         is|
|  presented|
|         by|
|    Project|
|  Gutenberg|
|         in|
|cooperation|
|       with|
|      World|
|    Library|
|        Inc|
|       from|
|      their|
|    Library|
|         of|
|        the|
+-----------+
only showing top 20 rows



In [8]:
counts = words.groupBy('words').count()
counts.show()

+-----------+-----+
|      words|count|
+-----------+-----+
|       AWAY|    1|
|         By|   16|
|      those|    8|
|irreligious|    2|
|       hope|    9|
|      Aside|   15|
|      crest|    1|
|        art|   21|
|ingratitude|    2|
|       some|   32|
|        Sit|    2|
|       Goth|    4|
|     distil|    1|
|      still|    7|
|     ransom|    2|
|        fog|    1|
|     poetry|    1|
|     Heaven|    1|
|    blossom|    1|
|      Virgo|    1|
+-----------+-----+
only showing top 20 rows



In [17]:
words.select(func.lower(func.col('words')).alias('words'))\
    .where(func.length('words') > 0).show()

+-----------+
|      words|
+-----------+
|       this|
|      etext|
|       file|
|         is|
|  presented|
|         by|
|    project|
|  gutenberg|
|         in|
|cooperation|
|       with|
|      world|
|    library|
|        inc|
|       from|
|      their|
|    library|
|         of|
|        the|
|     future|
+-----------+
only showing top 20 rows



In [37]:
def col_len(col_name):
    return func.length(col_name).cast('string').alias('len_{}'.format(col_name))
l = col_len('words')
m = col_len('m')
l, type(l), str(m)

(Column<b'CAST(length(words) AS STRING) AS `len_words`'>,
 pyspark.sql.column.Column,
 "Column<b'CAST(length(m) AS STRING) AS `len_m`'>")

In [34]:
words.where(l > 0)\
    .withColumn('words_lower', func.lower(words['words']))\
    .groupBy('words_lower')\
    .agg(func.count('*').alias('count'), 
         func.collect_set('words').alias('variants'))\
    .orderBy(func.size('variants'), 'variants', ascending=False)\
    .show()

+-----------+-----+--------------------+
|words_lower|count|            variants|
+-----------+-----+--------------------+
|       with|  284|  [with, WITH, With]|
|   tribunes|   18|[tribunes, Tribun...|
|       time|   21|  [time, TIME, Time]|
|       this|  248|  [this, This, THIS]|
|        the|  740|     [the, The, THE]|
|         so|  113|        [so, SO, So]|
|       send|   11|  [send, SEND, Send]|
|   readable|   12|[readable, READAB...|
|         or|  110|        [or, Or, OR]|
|      money|    6|[money, Money, MO...|
|        may|   60|     [may, May, MAY]|
|       long|   25|  [long, Long, LONG]|
|    library|   17|[library, Library...|
|         it|  162|        [it, It, IT]|
|         is|  200|        [is, IS, Is]|
|        for|  262|     [for, For, FOR]|
|     domain|    4|[domain, Domain, ...|
|   complete|   16|[complete, Comple...|
|         by|  131|        [by, BY, By]|
|        but|  121|     [but, BUT, But]|
+-----------+-----+--------------------+
only showing top

In [41]:
words.where(words.words.rlike('^(t|T).*')).show()

+-----+
|words|
+-----+
| This|
|their|
|  the|
| that|
|  the|
| This|
| THIS|
|  THE|
| THAT|
| TIME|
|   to|
|  The|
|  the|
|  The|
| THIS|
|  THE|
|  THE|
| THIS|
|   TO|
| THIS|
+-----+
only showing top 20 rows



In [44]:
func.trunc?

### Zadanie

Używając interfejsu i funkcji DataFrame lub SQL:

1. Popraw jakość danych wejściowych i prezentację wyników
1. Podaj liczność wyrazów zaczynających się od litery *t*
1. ★ Wykonaj mapowanie używając Pythonowej funkcji długości `len` do obliczenia średniej długości wyrazu *(nie zalecane w praktyce)*

## Co ze Spark Context?

W razie potrzeby nadal jest dostępny w sesji Spark:

In [45]:
spark.sparkContext

Oczywiście dalej mamy też nadal dostępne RDD:

In [46]:
words.rdd.take(10)

[Row(words=''),
 Row(words='This'),
 Row(words='Etext'),
 Row(words='file'),
 Row(words='is'),
 Row(words='presented'),
 Row(words='by'),
 Row(words='Project'),
 Row(words='Gutenberg'),
 Row(words='in')]

## Nowe formaty plików

Jedyną zmianą jest wprowadzenie formatu *CSV* jako wbudowanego; prezentowany powyżej format *text* jest dostępny od wersji 1.6.

In [49]:
csv = spark.read.csv('data/rollingsales_bronx.csv')
csv.printSchema()
csv.show()

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)
 |-- _c5: string (nullable = true)
 |-- _c6: string (nullable = true)
 |-- _c7: string (nullable = true)
 |-- _c8: string (nullable = true)
 |-- _c9: string (nullable = true)
 |-- _c10: string (nullable = true)
 |-- _c11: string (nullable = true)
 |-- _c12: string (nullable = true)
 |-- _c13: string (nullable = true)
 |-- _c14: string (nullable = true)
 |-- _c15: string (nullable = true)
 |-- _c16: string (nullable = true)
 |-- _c17: string (nullable = true)
 |-- _c18: string (nullable = true)
 |-- _c19: string (nullable = true)
 |-- _c20: string (nullable = true)

+---+--------------------+--------------------+---+----+---+---+---+--------------------+------------+-----+----+----+----+-----+-----+----+---------+----+--------+----------+
|_c0|                 _c1|                 _c2|_c3| _c4|_c5|_c6|_c7|    

In [61]:
houses = csv.select(
    func.trim(func.col('_c1')).alias('hood'), 
    func.trim(func.col('_c2')).alias('type'),
    func.regexp_replace(func.col('_c14'), '[^0-9.]', '').cast('int').alias('landArea'),
    func.regexp_replace(func.col('_c15'), '[^0-9.]', '').cast('int').alias('grossArea'),
    func.col('_c16').cast('int').alias('year'),
    func.regexp_replace(func.col('_c19'), '[^0-9.]', '').cast('int') > 0.alias('price')
)
houses.show()

+--------+--------------------+--------+---------+----+------+
|    hood|                type|landArea|grossArea|year| price|
+--------+--------------------+--------+---------+----+------+
|BATHGATE|01  ONE FAMILY HOMES|    1842|     2048|1901|355000|
|BATHGATE|01  ONE FAMILY HOMES|    1103|     1290|1910|474819|
|BATHGATE|01  ONE FAMILY HOMES|    1986|     1344|1899|210000|
|BATHGATE|01  ONE FAMILY HOMES|    2329|     1431|1901|343116|
|BATHGATE|02  TWO FAMILY HOMES|    1855|     4452|1931|     0|
|BATHGATE|02  TWO FAMILY HOMES|    2000|     2400|1993|316500|
|BATHGATE|02  TWO FAMILY HOMES|    2498|     2394|1995|390000|
|BATHGATE|02  TWO FAMILY HOMES|    1542|     1542|1899|207000|
|BATHGATE|02  TWO FAMILY HOMES|    1819|     2340|1998|     0|
|BATHGATE|02  TWO FAMILY HOMES|    1667|     1296|1910|369000|
|BATHGATE|02  TWO FAMILY HOMES|    5000|     5881|1910|308000|
|BATHGATE|02  TWO FAMILY HOMES|    2483|     1512|1901|     0|
|BATHGATE|02  TWO FAMILY HOMES|    1562|     3382|2004|

### Zadanie

Używając interfejsu i funkcji DataFrame lub SQL:

1. Popraw żle odczytane wartości (*null*).
1. Policz średnie wartości grupując po dzielnicy i typie.
1. Zapisz wynik do pliku CSV.

In [80]:
houses.groupBy('type').pivot('hood', ['BATHGATE', 'BAYCHESTER'])\
    .agg(func.avg('price').cast('int').alias('price'), 
         func.avg('year').cast('int').alias('year'))\
    .orderBy('type') \
    .dropna('all', subset=['BATHGATE_price', 'BATHGATE_year', 'BAYCHESTER_price', 'BAYCHESTER_year'])\
    .show()

+--------------------+--------------+-------------+----------------+---------------+
|                type|BATHGATE_price|BATHGATE_year|BAYCHESTER_price|BAYCHESTER_year|
+--------------------+--------------+-------------+----------------+---------------+
|01  ONE FAMILY HOMES|        345733|         1902|          175829|           1941|
|02  TWO FAMILY HOMES|        203427|         1947|          214003|           1961|
|03  THREE FAMILY ...|        292019|         1919|          244380|           1978|
|04  TAX CLASS 1 C...|          null|         null|          282164|           2008|
|05  TAX CLASS 1 V...|         40730|            0|          134508|            402|
|06  TAX CLASS 1 -...|          null|         null|            2500|           1800|
|07  RENTALS - WAL...|        424286|         1924|           83333|           1990|
|10  COOPS - ELEVA...|         27000|         1941|          129875|           1965|
|14  RENTALS - 4-1...|             0|         1931|            nu

In [83]:
[func.col(c).alias('my_{}'.format(c)) for c in houses.columns]

[Column<b'hood AS `my_hood`'>,
 Column<b'type AS `my_type`'>,
 Column<b'landArea AS `my_landArea`'>,
 Column<b'grossArea AS `my_grossArea`'>,
 Column<b'year AS `my_year`'>,
 Column<b'price AS `my_price`'>]

## Podzapytania SQL

Rozszeżony został również wachlarz dostępnych zapytań SQL, zwłaszcza użycie podzapytań, które pierwornie można było tylko uzywać z wyrażeniem `FROM`; zobacz [szczegóły na blogu Databricks](https://databricks.com/blog/2016/06/17/sql-subqueries-in-apache-spark-2-0.html). Zatem teraz można zrobić np takie zapytania:

In [84]:
# tworzymy tymczasowy widok do zapytania SQL
houses.createTempView('houses')

In [85]:
spark.sql("""
SELECT * FROM houses 
WHERE year > (SELECT avg(year) FROM houses) 
""").show()

+--------+--------------------+--------+---------+----+------+
|    hood|                type|landArea|grossArea|year| price|
+--------+--------------------+--------+---------+----+------+
|BATHGATE|01  ONE FAMILY HOMES|    1842|     2048|1901|355000|
|BATHGATE|01  ONE FAMILY HOMES|    1103|     1290|1910|474819|
|BATHGATE|01  ONE FAMILY HOMES|    1986|     1344|1899|210000|
|BATHGATE|01  ONE FAMILY HOMES|    2329|     1431|1901|343116|
|BATHGATE|02  TWO FAMILY HOMES|    1855|     4452|1931|     0|
|BATHGATE|02  TWO FAMILY HOMES|    2000|     2400|1993|316500|
|BATHGATE|02  TWO FAMILY HOMES|    2498|     2394|1995|390000|
|BATHGATE|02  TWO FAMILY HOMES|    1542|     1542|1899|207000|
|BATHGATE|02  TWO FAMILY HOMES|    1819|     2340|1998|     0|
|BATHGATE|02  TWO FAMILY HOMES|    1667|     1296|1910|369000|
|BATHGATE|02  TWO FAMILY HOMES|    5000|     5881|1910|308000|
|BATHGATE|02  TWO FAMILY HOMES|    2483|     1512|1901|     0|
|BATHGATE|02  TWO FAMILY HOMES|    1562|     3382|2004|

In [86]:
spark.sql("""
SELECT * FROM houses 
WHERE hood IN 
    (SELECT hood FROM
        (SELECT hood, avg(year) FROM houses GROUP BY hood ORDER BY avg(year) DESC LIMIT 5)
    ) 
""").show()

+--------------------+--------------------+--------+---------+----+-------+
|                hood|                type|landArea|grossArea|year|  price|
+--------------------+--------------------+--------+---------+----+-------+
|          BRONX PARK|02  TWO FAMILY HOMES|    2029|     4197|1915| 215000|
|          BRONX PARK|29  COMMERCIAL GA...|   12500|     2500|1951|2200000|
|CITY ISLAND-PELHA...|01  ONE FAMILY HOMES|   32202|     7122|1920|      0|
|CITY ISLAND-PELHA...|01  ONE FAMILY HOMES|   19200|     2500|1975|1062500|
|CITY ISLAND-PELHA...|01  ONE FAMILY HOMES|   12992|     1832|1975| 995000|
|CITY ISLAND-PELHA...|01  ONE FAMILY HOMES|   24840|    10945|1965|      0|
|CITY ISLAND-PELHA...|01  ONE FAMILY HOMES|    7500|     1848|1940| 635000|
|CITY ISLAND-PELHA...|01  ONE FAMILY HOMES|   18227|     8421|1986|      0|
|          CO-OP CITY|01  ONE FAMILY HOMES|    3750|     1080|1920| 175000|
|          CO-OP CITY|02  TWO FAMILY HOMES|    1908|     2317|2011| 520000|
|          C

## Nowy dostęp do katalogu tabel

SparkSession udostępnia równierz zały katalog tabel, który jest podobny do metastore w Hive; przy połączeniu z Hive pojawią się też tabele z Hive. Katalog jest dostępny bezpośrednio z sesji Spark:

In [None]:
# naciśnij tab
spark.catalog.

Dostępna jest lista tabel:

In [87]:
spark.catalog.listTables()

[Table(name='houses', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]

### Zadanie

1. Zarejestruj DataFrame `counts` jako tymczasową tabelę; zobacz listę tabel.
1. Usuń tabelę `counts`; zobacz listę tabel.

In [88]:
counts.createOrReplaceTempView('counts')
spark.catalog.listTables()

[Table(name='counts', database=None, description=None, tableType='TEMPORARY', isTemporary=True),
 Table(name='houses', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]

In [91]:
spark.catalog.dropTempView('counts')
spark.catalog.listTables()

[Table(name='houses', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]

Dostępne sa też inne listy:

In [92]:
print("Databases: ", spark.catalog.listDatabases())
print("Functions: ", spark.catalog.listFunctions())

Databases:  [Database(name='default', description='Default Hive database', locationUri='file:/home/kodolamacz/Dokumenty/spark/spark-warehouse')]
Functions:  [Function(name='!', description=None, className='org.apache.spark.sql.catalyst.expressions.Not', isTemporary=True), Function(name='%', description=None, className='org.apache.spark.sql.catalyst.expressions.Remainder', isTemporary=True), Function(name='&', description=None, className='org.apache.spark.sql.catalyst.expressions.BitwiseAnd', isTemporary=True), Function(name='*', description=None, className='org.apache.spark.sql.catalyst.expressions.Multiply', isTemporary=True), Function(name='+', description=None, className='org.apache.spark.sql.catalyst.expressions.Add', isTemporary=True), Function(name='-', description=None, className='org.apache.spark.sql.catalyst.expressions.Subtract', isTemporary=True), Function(name='/', description=None, className='org.apache.spark.sql.catalyst.expressions.Divide', isTemporary=True), Function(na

Można też łatwiej zmienić bazę danych:

In [93]:
# zmień na swój login
spark.catalog.setCurrentDatabase('login') 
# poniższa funkcja tylko wypisuje obecną bazę danych
spark.catalog.currentDatabase()

AnalysisException: "Database 'login' does not exist.;"

In [94]:
h = spark.table('houses')
h.show()

+--------+--------------------+--------+---------+----+------+
|    hood|                type|landArea|grossArea|year| price|
+--------+--------------------+--------+---------+----+------+
|BATHGATE|01  ONE FAMILY HOMES|    1842|     2048|1901|355000|
|BATHGATE|01  ONE FAMILY HOMES|    1103|     1290|1910|474819|
|BATHGATE|01  ONE FAMILY HOMES|    1986|     1344|1899|210000|
|BATHGATE|01  ONE FAMILY HOMES|    2329|     1431|1901|343116|
|BATHGATE|02  TWO FAMILY HOMES|    1855|     4452|1931|     0|
|BATHGATE|02  TWO FAMILY HOMES|    2000|     2400|1993|316500|
|BATHGATE|02  TWO FAMILY HOMES|    2498|     2394|1995|390000|
|BATHGATE|02  TWO FAMILY HOMES|    1542|     1542|1899|207000|
|BATHGATE|02  TWO FAMILY HOMES|    1819|     2340|1998|     0|
|BATHGATE|02  TWO FAMILY HOMES|    1667|     1296|1910|369000|
|BATHGATE|02  TWO FAMILY HOMES|    5000|     5881|1910|308000|
|BATHGATE|02  TWO FAMILY HOMES|    2483|     1512|1901|     0|
|BATHGATE|02  TWO FAMILY HOMES|    1562|     3382|2004|

In [95]:
h.write.partitionBy('hood').csv('houses.csv')

In [98]:
spark.read.csv('houses.csv').where(func.col('hood') == 'BATHGATE').show()

+--------------------+----+----+----+------+--------+
|                 _c0| _c1| _c2| _c3|   _c4|    hood|
+--------------------+----+----+----+------+--------+
|01  ONE FAMILY HOMES|1842|2048|1901|355000|BATHGATE|
|01  ONE FAMILY HOMES|1103|1290|1910|474819|BATHGATE|
|01  ONE FAMILY HOMES|1986|1344|1899|210000|BATHGATE|
|01  ONE FAMILY HOMES|2329|1431|1901|343116|BATHGATE|
|02  TWO FAMILY HOMES|1855|4452|1931|     0|BATHGATE|
|02  TWO FAMILY HOMES|2000|2400|1993|316500|BATHGATE|
|02  TWO FAMILY HOMES|2498|2394|1995|390000|BATHGATE|
|02  TWO FAMILY HOMES|1542|1542|1899|207000|BATHGATE|
|02  TWO FAMILY HOMES|1819|2340|1998|     0|BATHGATE|
|02  TWO FAMILY HOMES|1667|1296|1910|369000|BATHGATE|
|02  TWO FAMILY HOMES|5000|5881|1910|308000|BATHGATE|
|02  TWO FAMILY HOMES|2483|1512|1901|     0|BATHGATE|
|02  TWO FAMILY HOMES|1562|3382|2004|443776|BATHGATE|
|02  TWO FAMILY HOMES| 885|2655|1931|     0|BATHGATE|
|03  THREE FAMILY ...|2022|3854|1899|     0|BATHGATE|
|03  THREE FAMILY ...|3525|3