In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/spark-3.0.2/spark-3.0.2-bin-hadoop2.7.tgz
!tar xf spark-3.0.2-bin-hadoop2.7.tgz
!pip install -q findspark==1.3.0
!pip install -q pyspark==3.0.2
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.2-bin-hadoop2.7"
import findspark
import pyspark
from pyspark.sql import SparkSession
from pyspark import SparkContext

findspark.init("spark-3.0.2-bin-hadoop2.7")
sc = pyspark.SparkContext('local[*]')
spark = SparkSession.builder.appName('abc').getOrCreate()

[K     |████████████████████████████████| 204.8MB 69kB/s 
[K     |████████████████████████████████| 204kB 19.6MB/s 
[?25h  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


# **RDD API**

**Plik**: 500_richest.csv

Wyświetl top 5 branż (Industry) w Stanach Zjednoczonych pod względem zsumowanego majątku najbogatszych ludzi pochodzących z tych branż (pogrupuj branże po Total Net Worth)

Podpowiedź: (B) Bilion to 1000000000

**Przykładowy wynik**: 
[('Diversified', 1406660000000),
 ('Finance', 1306660000000),
 ('Energy', 906660000000),
 ('Consumer', 506660000000),
 ('Retail', 206660000000)]

In [None]:
richest = sc.textFile('500_richest.csv')


In [None]:
richest.take(5)

['Rank;Name;Total Net Worth;$ Last Change;$ YTD Change;Country;Industry',
 '1;Jeff Bezos;$188B;+$1.68B;-$2.31B\xa0;United States;Technology',
 '2;Elon Musk;$170B;-$2.89B;+$773M\xa0;United States;Technology',
 '3;Bernard Arnault;$155B;+$892M;+$40.9B\xa0;France;Consumer',
 '4;Bill Gates;$144B;-$1.32B;+$12.2B\xa0;United States;Technology']

In [None]:
rows = richest.map(lambda x: x.split(';'))
rows.take(5)

[['Rank',
  'Name',
  'Total Net Worth',
  '$ Last Change',
  '$ YTD Change',
  'Country',
  'Industry'],
 ['1',
  'Jeff Bezos',
  '$188B',
  '+$1.68B',
  '-$2.31B\xa0',
  'United States',
  'Technology'],
 ['2',
  'Elon Musk',
  '$170B',
  '-$2.89B',
  '+$773M\xa0',
  'United States',
  'Technology'],
 ['3',
  'Bernard Arnault',
  '$155B',
  '+$892M',
  '+$40.9B\xa0',
  'France',
  'Consumer'],
 ['4',
  'Bill Gates',
  '$144B',
  '-$1.32B',
  '+$12.2B\xa0',
  'United States',
  'Technology']]

In [None]:
def bToNums(n):
  if n[3] == "B":
    return float(n[0:3])*1000000000
  if n[4] == "B":
    return float(n[0:4])*1000000000
  elif n[5] == "B":
    return float(n[0:5])*1000000000

In [None]:
(rows
 .filter(lambda x: 'Name' not in x)
 .filter(lambda x: 'United States' in x)
 .map(lambda x: (x[-1], bToNums(x[2][1:6]) ))
 .reduceByKey(lambda x, y: x + y)
 .sortBy(lambda x: x[1],ascending = False )
 #.reduceByKey(lambda x, y: x + y)
 ).take(5)

[('Technology', 1406660000000.0),
 ('Finance', 333370000000.0),
 ('Retail', 270920000000.0),
 ('Diversified', 185700000000.0),
 ('Consumer', 170300000000.0)]

In [None]:
#bToNums('7.17B')
#bToNums('64.3B')
#bToNums('101B')
?rows.sortBy

# **DataFrames**

**Plik**: 500_richest.csv

Rozpatrzmy następujące kraje: Rosja, Szwecja, Niemcy (kolumna Country: Russia, Sweden, Germany). Jeżeli uszeregujemy te zbiory pod względem Total Net Worth, to ile wynosiła największa różnica między dwoma sąsiednimi wynikami najbogatszych ludzi w każdym z tych krajów? (największa różnica w rankingu między sąsiadami dla każdego kraju).


In [None]:
df = spark.read.csv('500_richest.csv', header=True, sep = ";")

In [None]:
import pyspark.sql.functions as f
from pyspark.sql.types import *

In [None]:
def bToNums2(n):
  if n[3] == "B":
    return '3' #n[1:3]
  if n[4] == "B":
    return '4'#n[1:4]
  elif n[5] == "B":
    return float(n[1:5]) #n[1:5]

In [None]:
udf_bToNums = f.udf(bToNums2, StringType()) # potrzebne do udf

In [None]:
df.show(5)

+----+---------------+---------------+-------------+------------+-------------+----------+
|Rank|           Name|Total Net Worth|$ Last Change|$ YTD Change|      Country|  Industry|
+----+---------------+---------------+-------------+------------+-------------+----------+
|   1|     Jeff Bezos|          $188B|      +$1.68B|    -$2.31B |United States|Technology|
|   2|      Elon Musk|          $170B|      -$2.89B|     +$773M |United States|Technology|
|   3|Bernard Arnault|          $155B|       +$892M|    +$40.9B |       France|  Consumer|
|   4|     Bill Gates|          $144B|      -$1.32B|    +$12.2B |United States|Technology|
|   5|Mark Zuckerberg|          $114B|       +$203M|    +$10.9B |United States|Technology|
+----+---------------+---------------+-------------+------------+-------------+----------+
only showing top 5 rows



In [None]:
df = (df
 .select(f.col('Name'), f.col('Total Net Worth'), f.col('Country'))
 .withColumn('Total Net Worth', udf_bToNums(f.col('Total Net Worth')))
 )

In [None]:
import sys
from pyspark.sql.window import Window

windowSpec = (Window
              .partitionBy(f.col('country'))
              .orderBy(f.col('total net worth').desc())
              )

windowSpec2 = (Window
               .partitionBy(f.col('country'))
               .orderBy(f.col('difference').desc())
               )

In [None]:
df_new_df = (
    df_new
    .withColumn('Total Net Worth', f.col('Total Net Worth').cast(FloatType()))
    .withColumn('row_number', f.row_number().over(windowSpec))
    .withColumn('lead_tnw', f.lead('Total Net Worth', 1).over(windowSpec))
    .withColumn('difference', f.col('Total Net Worth') - f.col('lead_tnw')  )
    .withColumn('row_number_DIFF', f.row_number().over(windowSpec2) )
    .select( f.col('country'), f.col('difference'), f.col('row_number_DIFF'))
    .filter (f.col('row_number_DIFF') == 1)
)

In [None]:
df_new_df.createOrReplaceTempView('richest_new_df')

In [None]:
sqlDF_df = spark.sql(""" 
                        select country, round(difference, 2) AS `MAX DIFF`
                        from richest_new_df
                        where country in ('Russia', 'Germany', 'Sweden')
                  """)
sqlDF_df.show()

+-------+--------+
|country|MAX DIFF|
+-------+--------+
| Russia|     7.3|
| Sweden|     8.1|
|Germany|     6.9|
+-------+--------+



**Plik**: all_weekly_excess_deaths.csv

Wyświetl top 5 krajów pod względem liczby zgonów nie związanych z covid (non_covid_deaths) w roku 2020. Do każdej krotki dodaj na końcu napis "NON COVID DEATHS IN 2020" 

Podpowiedź: Turcja źle raportuje non_covid_deaths. Dane niekoniecznie są podane w formacie całkowitoliczbowym. Usuń Trucję z prowadzonej analizy.


**Przykładowy wynik**: 

[('Britain', 9964329, 'NON COVID DEATHS IN 2020'),

 ('Germany', 2505607, 'NON COVID DEATHS IN 2020'),

 ('France', 1468112, 'NON COVID DEATHS IN 2020'),

 ('Poland', 1309358, 'NON COVID DEATHS IN 2020'),

 ('Mexico', 1065198, 'NON COVID DEATHS IN 2020')]

In [None]:
deaths = sc.textFile('all_weekly_excess_deaths.xls')

In [None]:
rows = deaths.map(lambda x : x.split(';'))
rows.take(5)

(rows
 .filter(lambda x: 'Turkey' not in x)
 .filter(lambda x: '2020' in x)
 .map(lambda x: (x[0], int(x[-4]) ))
 .reduceByKey(lambda x, y: x + y)
 .sortBy(lambda x: x[1],ascending = False)
 .map(lambda x: (x[0], x[1], 'NON COVID DEATHS IN 2020' ) )
).take(5)

[('United States', 6251011, 'NON COVID DEATHS IN 2020'),
 ('Britain', 1747182, 'NON COVID DEATHS IN 2020'),
 ('Italy', 1323156, 'NON COVID DEATHS IN 2020'),
 ('France', 1200143, 'NON COVID DEATHS IN 2020'),
 ('Germany', 966878, 'NON COVID DEATHS IN 2020')]