# Fördjupning i Apache Spark
Nu när vi har gått igenom grunderna för Spark så kan vi titta på hur vi kan göra mer sofistikerad bearbetning av data och applicera analytiska funktioner. Även om Spark data frames är ett relativt nytt koncept så är funktionaliteten redan kraftfull och förbättras för varje ny release. 

### Joins 
Joins i Spark fungerar på samma sätt som i SQL och Pandas och görs genom funktionen `.join()` som finns på alla objekt av typen data frame.

In [None]:
hc = HiveContext(sc)

In [None]:
bet = hc.table('analytics_prod_11.bet')
draw = hc.table('analytics_prod_11.draw')

In [None]:
draw.printSchema()

In [None]:
join_cond = (bet['draw_number'] == draw['draw_number']) & (bet['gem_product_id'] == draw['gem_product_id'])

joined = bet.join(draw, join_cond, 'leftouter')

In [None]:
type(joined)

In [None]:
joined.printSchema()

I det här fallet joinar vi på en nyckel som heter likadant i båda tabellerna vilket innebär att vi får dubletter av fältet `draw_number`. Om vi försöker selektera detta fältet får vi ett felmeddelande enligt nedan.

In [None]:
joined.select('draw_number').printSchema()

För att hantera detta kan vi prefixa kolumnen med dess ursprungstabell enligt nedan.

In [None]:
joined.select('bet.draw_number').printSchema()

Oftast är det dock enklast att droppa eller döpa om dublettkolumnerna för att slippa få framtida fel.

In [None]:
joined = (joined.drop(joined['draw.draw_number'])
          .drop(joined['draw.dt'])
          .drop(joined['draw.gem_product_id'])
         )

In [None]:
joined.printSchema()

### Cachning av data
Spark är ett in-memory-baserat ramverk som håller data i minnet under tiden en operation exekveras. Problemet är att minnet släpps när operationen är avslutad vilket gör att man under en explorativ analys exempelvis kör exakt samma jobb flera gånger vilket adderar massor av tid. Ett sätt att komma runt detta är att cacha det datat man vill arbeta med tills man har jobbat sig igenom sin analys och i skede två släppa cachen och köra på full volym.

Nedan är ett sätt att ta ett sample av datat och cacha detta för att få upp hastigheten i efterföljande operationer.

In [None]:
cached = (joined.filter("dt >= '2014-01-01'")
          .sample(False, 0.0001)
          .cache()
          )

cached.count()

### Kolumnoperationer
För att enklare kunna komma åt alla kolumnfunktioner kan vi deklarera en ny variabel `x` till kolumnen `cached['customer_id]`. 

In [None]:
cached.limit(5).toPandas()

In [None]:
cached.customer_id

In [None]:
x = cached['customer_id']
x

Kolumner i Spark har en hel del inbyggda funktioner precis som Pandas. 

In [None]:
x.alias('test')

In [None]:
x.astype('int')

In [None]:
x.isNull()

Dessa kan användas i selekteringar enligt nedan.

In [None]:
(cached.select(cached['customer_id'].alias('kund'), cached['draw_number'].isNotNull(), 'register_ts')
 .show()
)

För att komma åt flera functioner kan modulen `functions` importeras. Jag brukar importera med alias `f` för att underlätta användningen.

In [None]:
from pyspark.sql import functions as f

In [None]:
cached.printSchema()

In [None]:
type(cached)

In [None]:
f.expr("wager_sg_1_sek + wager_sg_2_sek").alias('wager_sek')

In [None]:
cached.select(f.to_date('register_ts'),
              f.current_date(),
              'wager_sg_1_sek',
              f.lit('konstant'),
              f.year('bet.dt'),
              f.when(cached['wager_sg_1_sek'] > 50, 'BIG BET').otherwise('small bet').alias('kind_of_bet'),
              f.expr("wager_sg_1_sek + wager_sg_2_sek").alias('wager_sek')
              ).show()

Om vi vill spara resultatet av operationerna ovan behöver vi deklarera det som en ny data frame.

In [None]:
result = cached.select(f.to_date('register_ts').alias('trans_date'),
              f.current_date().alias('curr_date'),
              'wager_sg_1_sek',
              f.lit('konstant'),
              f.year('bet.dt').alias('year'),
              f.when(cached['wager_sg_1_sek'] > 50, 'BIG BET').otherwise('small bet').alias('kind_of_bet'),
              f.expr("wager_sg_1_sek + wager_sg_2_sek").alias('wager_sek')
              )

In [None]:
result.show(5)

Om vi endast vill addera en kolumn till en befintliga data fram kan vi göra det med funktionen `.withColumn()`.

In [None]:
result.withColumn('date_diff', f.datediff(result['curr_date'], result['trans_date'])).show(5)

Men även här måste vi spara vårt resultat i en ny variabel för att *spara* förändringen. Om vi kör `result.show(5)` så ser vi inte kolumnen `date_diff` ovan. Detta är ungefär samma beteende som Pandas har med undantaget att man använder inPlace. Spark är helt **immutable** vilket innebär att ett dataset aldrig kan ändras utan bara transformeras till nya dataset. Det är standard i distribuerade system och ett önskvärt beteende.

In [None]:
result.show(5)

### Hantering av nullvärden i Spark
Spark har grundläggande hantering av nullvärden vilka är grupperade under dataframefunktionen `.na`. För att droppa rader med nullvärden kan man exempelvis använda funktionen `.na.drop()`.

Vi kan introducera några nullvärden för att se hur Spark kan hantera det.

In [None]:
where_clause_1 = f.when(result['kind_of_bet'] == 'small bet', None).otherwise(result['kind_of_bet'])
where_clause_2 = f.when(result['wager_sg_1_sek'] == 25.00, None).otherwise(result['wager_sg_1_sek'])

null_result = (result.withColumn('null_col', where_clause_1)
               .withColumn('null_col_2', where_clause_2)
               )

null_result.show(50)

In [None]:
null_result.na.drop().show(50)

Ovan ser vi att Spark hittar de nullvärden vi introducerade och droppar de raderna. För att istället ersätta värden kan vi använda funktionen `.na.fill()`. 

In [None]:
null_result.na.fill(50).show(50)

Om vi studerar resultatet ovan ser vi att Spark ignorerar kolumner där datatyperna inte matchar. Det innebär att vårt numeriska värde 50 enbart appliceras på `null_col_2` och inte på den icke-numeriska kolumnen `null_col`.

Här får vi antingen applicera funktionen två gånger eller skapa en `dict` med en `kolumn : värde`-mappning enligt nedan.

In [None]:
null_result.dtypes

In [None]:
d = {'null_col' : '50', 'null_col_2' : 80}

null_result.na.fill(d).show(50)

### Aggregeringar och group by operationer

In [None]:
gb = cached.groupby('gem_product_id')
type(gb)

Alla direkta funktioner på vårt GroupedData objekt applicerar på alla numeriska kolumner. 

In [None]:
gb.sum().limit(10).toPandas()

Alternativt kan vi vara mer specifika i vad vi vill göra genom att skicka in valfria aggregeringsfunktioner så här.

In [None]:
cached.groupby('gem_product_id').agg(f.sum('wager_sg_1_sek'), f.avg('wager_sg_1_sek'), f.countDistinct('customer_id')).show(5)

Vi kan också skicka in en dict som mappar kolumn till transformation så här.

In [None]:
d = {'wager_sg_1_sek' : 'sum', 'wager_sg_2_sek' : 'mean'}

gb.agg(d).show(5)

### Pivotteringar i Spark
Spark har grundläggande funktionalitet för att skapa pivottabeller av typen vi gått igenom i Pandas. Om vi exempelvis vill summera försäljning per år kan vi göra det så här.

In [None]:
df = cached.select(f.month('dt').alias('month'), f.year('dt').alias('year'), '*')

In [None]:
pivot = (df.groupby('gem_product_id')
         .pivot('year')
         .sum('wager_sg_1_sek')
         .toPandas()
        )

pivot.head()

Vi kan även göra flera summeringar med funktionen `.agg()`.

In [None]:
pivot = (df.groupby('gem_product_id')
         .pivot('year')
         .agg(f.sum('wager_sg_1_sek'), f.countDistinct('customer_id'))
         .toPandas()
        )

pivot.head()

### Fönsterfunktioner
Spark har ett väl fungerande stöd för fönsterfunktioner av olika slag. Grundfunktionen är att man specificerar ett fönster och anger `.partitionBy()` och `.orderBy()`. Därefter har man tillgång till en mängd fönsterfunktioner i biblioteket `pyspark.sql.functions` som vi tidigare har importerat som `f`.

Därefter kan man använda funktioner som arbetar över det definierade fönstret enligt `function().over(window)` som nedan.

In [None]:
from pyspark.sql import Window

In [None]:
w = Window.partitionBy(cached['customer_id']).orderBy(cached['dt'].asc())

In [None]:
cached.select('customer_id', f.row_number().over(w).alias('row_num')).show(15)

Om vi vill räkna på spelfrekvens och varians utifrån dagar mellan speltillfällen kan vi relativt enkelt göra det genom att först räkna ut dagar mellan spel så här.

In [None]:
player_bets = (cached.select('customer_id', 'dt', f.lag('dt', 1, None).over(w).alias('last_dt')) 
               .withColumn('diff_dt', f.datediff('dt', 'last_dt'))
              )

player_bets.filter("diff_dt > 0").show(10)

När vi har de nödvändiga kolumnerna redo kan vi enkelt gruppera på `customer_id` och aggregera med avg och stddev.

In [None]:
player_freq = (player_bets.groupBy('customer_id')
               .agg(f.avg('diff_dt').alias('avg_days_between_bets'),
                    f.stddev('diff_dt').alias('stddev_days_between_bets'))
              )

player_freq.filter("avg_days_between_bets > 0 and stddev_days_between_bets != 'NaN'").show(10)

### Tidsinterval i Spark
I Spark kan man använda tidsuttryck i form av exempelvis `interval 30 days` för att räkna fram olika tidsintervall. Man använder dessa uttryck med funktionen `f.expr()` så här.

In [None]:
f.expr('interval 30 days')

Därefter kan man använda uttrycken i selectuttryck så här.

In [None]:
player_bets.select(player_bets['dt'] + f.expr('interval 90 days'), 'dt').show(5)

Eller som filteruttryck så här.

In [None]:
bets_placed_within_2_weeks = player_bets.filter("last_dt + interval 2 weeks > dt")
bets_placed_within_2_weeks.show(10)

### Bearbetning av nästlade loggar i Spark
En stor del av det data vi har i vår lake består av nästlade strukturer. För att kunna få ut rätt information är det centralt att kunna *platta ut* datat på ett korrekt sätt. Som tur är har Spark fantastiskt bra stöd för den typen av operationer vilket vi ska gå igenom här.

Vi börjar med att läsa in en loggfil innehållande speltransaktioner för `2016-05-11` på samma sätt som vi har gjort tidigare.

In [None]:
path = '/svsdata/argon_prod/ItsRegWager/dt=2016-05-11'
itsregwager = hc.jsonRDD(sc.sequenceFile(path).values(), samplingRatio=0.01)

Om vi kör `.printSchema()` ser vi hela strukturen och hur den är nästlad.

In [None]:
itsregwager.printSchema()

Vi kan börja med att ta ut `Wager` som är det fältet som innehåller speltransaktionerna och cacha det för att snabba upp efterföljande bearbetning. Om vi nu visar några rader data så ser vi att vi enbart har en kolumn på översta nivån.

In [None]:
wager = itsregwager.select('Wager').cache()
wager.show(5)

För att selektera underliggande fält kan vi använda syntaxen `fält1.fält2` för att ta oss ner i strukturen. Vi får bra resultat för de direkt underliggande fälten men för `Bets` som är en lista ser det inte lika relevant ut.

In [None]:
wager.select('Wager.CustomerId', 'Wager.PartnerId', 'Wager.Bets').show(5)

Vi kan enkelt gå ett steg djupare i hierarkin så här vilket ger något bättre överblick men vi ser tydligt här att flera rader har 2 stycken poster i fälten `ProductId` och `Amount1`. 

In [None]:
wager.select('Wager.CustomerId', 'Wager.PartnerId', 'Wager.Bets.ProductId', 'Wager.Bets.Amount1').show(5)

In [None]:
wager.select('Wager.CustomerId', 'Wager.PartnerId', 'Wager.Bets.ProductId', 'Wager.Bets.Amount1').limit(10).collect()

Det vi kan göra här är att pivotera fältet `Bets` så att vi istället för en lista får en ny rad för varje element. För att åstadkomma det kan vi använda oss av funktionen `f.explode()`. I resultatet av den operationen ser vi att vi istället för en rad för första kunden nu får två rader.

In [None]:
bets = wager.select('Wager.CustomerId', 'Wager.PartnerId', f.explode('Wager.Bets').alias('B'))
bets.show(5)

Vi kan också enkelt se effekten genom att räkna antalet rader före och efter pivotteringen.

In [None]:
print wager.count()
print bets.count()

Om vi nu väljer ut samma kolumner som vi gjorde ovan ser vi att vi inte längre har flera spel på samma rad.

In [None]:
bets.select('CustomerId', 'PartnerId', 'B.ProductId', 'B.Amount1', 'B.Amount2').show(5)

För att snygga upp det lite kan vi exempelvis göra enligt nedan för att bland annat få beloppen i kronor istället för ören.

In [None]:
final = bets.select('CustomerId', 
            'PartnerId', 
            f.split(bets['B.ProductId'], '-')[0].alias('ProductId'),
            f.split(bets['B.ProductId'], '-')[1].alias('ProductName'), 
            f.expr('B.Amount1 / 100').alias('Amount1_Sek'),
            f.expr('B.Amount2 / 100').alias('Amount2_Sek'),
           
           )
                    


#final.filter("n_boards > 1").limit(10).toPandas()

### User-defined functions
Om vi har behov av att göra något särskilt kan vi väldigt smidigt definiera egna funktioner och registrera dessa som udf:er i Spark. Exemplet nedan använde jag för att räkna fram dragningssekvenser för flerveckorsspel. 

Funktionen tar en input i form av första dragningsnumret och en integer för längden på sekvensen och returnerar en lista med integers. Vi kan testa funktionen lokalt i Python för att se att den returnerar rätt output.

In [None]:
def sequence(draw, seqlength):
    draw = int(draw)
    seq = [draw]
    
    while len(seq) < seqlength:
        next_draw = seq[-1] + 1
        seq.append(next_draw)
    
    return seq

sequence(1530, 5)

För att använda denna i Spark kan vi göra det så här. Funktionen  `f.udf()` tar en funktion och behöver definiera en `returnType` som i det här fallet är en array av integers eller `ArrayType(IntegerType())` i Sparks värld. 

Vi behöver importera dessa typer innan vi kan använda dom.

In [None]:
from pyspark.sql.types import ArrayType, IntegerType

seq = f.udf(sequence, ArrayType(IntegerType()))

Nu när vi har registrerat funktionen kan vi lätt använda den i exempelvis en select.

In [None]:
t = final.select('ProductId', 
             seq('ProductId', f.lit(3)).alias('tmp')
            )

t.show(10)

In [None]:
t.select('ProductId', f.explode('tmp')).show(10)