# SPARK - adatelokeszitesi szakasz

In [1]:
import json
import re
import string

import pandas as pd
import numpy as np

In [2]:
sc = SparkContext.getOrCreate()

In [3]:
# Itt a "sc" alapbol letezik, ez az ugynevezett SparkContext
df=sc.textFile("../data/jokecomments2014.txt").repartition(8)

Ha rossz fájlnévvel futtatom le, akkor nem ad errort. Csak megjegyzi a feladatot, de nem futtatja le.

In [4]:
#(1) Nezzunk bele - take(2) ket sort ad vissza belole

In [4]:
df.take(2)

['(u"Why can\'t Ray Charles see his friends?\\n\\nBecause he\'s married.", 18, u\'1388584109\', u\'t1_ceembm2\', u\'t1_ceeqkew\', u\'t3_1u4u78\')',
 "(u'I am laughing too (literally).\\n\\nNow onto the puns:\\n\\nThis joke blew me away!', 11, u'1388587512', u't1_ceeh9ix', u't1_ceer4bz', u't3_1u4eex')"]

Ez nem 'head' mivel elosztott forrásnál az első sor nehez értelmezhető.

A taket itt arra is használjuk, hogy az esetleges bugok forrását könnyebben felismerjük (a spark nem nagyon túl felhasználóbarát az error jelentéseiben).

Az eredménye nem lista, hanem tuple (fix elemszámú, nem módosítható lista).

A map függvény egy új RDD-t fog létrehozni.

In [6]:
#(2) Egy kicsit rendbe kell kapni az adatokat, DATA PREPARATION
#Ket lepesben hajtjuk vegre, itt a kulcs a "map" fuggveny

In [9]:
dfALMA=df.map(lambda x: eval(x))
dfALMA.take(2)

[("Why can't Ray Charles see his friends?\n\nBecause he's married.",
  18,
  '1388584109',
  't1_ceembm2',
  't1_ceeqkew',
  't3_1u4u78'),
 ('I am laughing too (literally).\n\nNow onto the puns:\n\nThis joke blew me away!',
  11,
  '1388587512',
  't1_ceeh9ix',
  't1_ceer4bz',
  't3_1u4eex')]

Egy sor egy ötösből tud álllni.

In [10]:
#Kiszedjuk a furcsa karaktereket, kisbetusitunk, kiszedjuk az irasjeleket

In [41]:
def string_ok(x):
    s=x
    re.sub(r'\W+', '', s)
    s=s.replace('\n'," ")
    s=s.replace('\t'," ")
    s=s.replace('\r'," ")
    exclude = set(string.punctuation)
    s2 = ''.join(ch for ch in s if ch not in exclude)
    s2=s2.lower()
    s2=s2.strip()
    return s2

dfBANAN=dfALMA.map(lambda x: (string_ok(x[0]),x[1]))
dfBANAN.take(2)

[('why cant ray charles see his friends  because hes married', 18),
 ('i am laughing too literally  now onto the puns  this joke blew me away',
  11)]

In [42]:
#(3) Nezzunk nehany lehuzott kommentet. Irjunk ki nehanyat, aminek 
# negativ a score-ja
#
# EHHEZ: filter fuggveny, benne fuggveny, ami logikai erteket ad vissza

In [43]:
dfBANAN.filter(lambda x: x[1] < 0).take(5)

[('this joke sucks', -19),
 ('deleted', -18),
 ('which came first the chicken or the egg', -15),
 ('deleted', -36),
 ('americans build a tower in two week plane take it down in two minutes',
  -20)]

In [44]:
dfBANAN.count()

44015

In [45]:
#(4) Mi lenne, ha egy sor inkabb egy szo lenne. Egy olyan map fuggveny
# kell, ami nem egy sort general ujra, hanem akarhanyat (0... sok)
# ez a flatMap
#
# Itt egy pelda, ertelmezzuk

In [46]:
def szo_es_score(x):
    result=[]
    for i in x[0].split(" "):
        if x[1]<0:
            result.append((i,(x[1],0,1))) # Ha negatív, az első elem a rate
        else:
            result.append((i,(0,x[1],1))) # Ha pozitív, a második elem a rate
    return result

dfCITROM=dfBANAN.flatMap(szo_es_score)

dfCITROM.take(6)

[('why', (0, 18, 1)),
 ('cant', (0, 18, 1)),
 ('ray', (0, 18, 1)),
 ('charles', (0, 18, 1)),
 ('see', (0, 18, 1)),
 ('his', (0, 18, 1))]

In [47]:
dfCITROM.count()

846431

In [48]:
dfBANAN.filter(lambda x: x[1] < 0).flatMap(szo_es_score).take(5)

[('this', (-19, 0, 1)),
 ('joke', (-19, 0, 1)),
 ('sucks', (-19, 0, 1)),
 ('deleted', (-18, 0, 1)),
 ('which', (-15, 0, 1))]

In [49]:
#(5) Es ha van, van reduce is. Leggyakrabban a reduceByKey-t hasznaljuk.


reduceByKey(f):

(k, v1)
(k, v2)

ezekbol csinal egzy k, v3-at

v1, v2 -> v3

(a, b, c), (d, e, f) -> (a+d, b+e, c+f)

In [50]:
def okos_osszeado(x, y):
    return (x[0] + y[0], x[1] + y[1], x[2] + y[2])

In [51]:
dfDIO = dfCITROM.reduceByKey(okos_osszeado).filter(lambda x: x[0] != "")
dfDIO.take(10)

[('why', (-4649, 108242, 1680)),
 ('married', (-283, 21829, 212)),
 ('i', (-41799, 1306044, 17357)),
 ('now', (-2760, 101705, 1333)),
 ('blew', (-15, 4765, 40)),
 ('rekt', (-171, 2195, 51)),
 ('handcuffing', (0, 11, 1)),
 ('edit', (-6771, 78734, 737)),
 ('im', (-9203, 204081, 2646)),
 ('quality', (-53, 4843, 49))]

In [52]:
#(6) Rendezetten is lekerdezhetunk: itt egy pelda. Elozo eredmenyekbol
#kiindulva nezzuk meg, mi szedte ossze a legtobb pozitiv es legtobb 
#negativ pontot

In [53]:
dfCITROM.takeOrdered(5,key=lambda x: -x[1][1])

[('as', (0, 6764, 1)),
 ('a', (0, 6764, 1)),
 ('dad', (0, 6764, 1)),
 ('and', (0, 6764, 1)),
 ('a', (0, 6764, 1))]

In [65]:
dfDIO.takeOrdered(5,key=lambda x: -x[1][1])

[('the', (-72481, 2889723, 37032)),
 ('a', (-51889, 1888037, 24587)),
 ('to', (-46326, 1491265, 18440)),
 ('and', (-35400, 1333927, 16443)),
 ('i', (-41799, 1306044, 17357))]

In [69]:
def okos_score(x):
    return (x[0], (x[1][1] + 1) / (-1 * x[1][0] + 1), x[1][2])

dfEPER = dfDIO.map(okos_score)
dfEPER.take(10)

[('why', 23.27806451612903, 1680),
 ('married', 76.86619718309859, 212),
 ('i', 31.245095693779906, 17357),
 ('now', 36.83665338645418, 1333),
 ('blew', 297.875, 40),
 ('rekt', 12.767441860465116, 51),
 ('handcuffing', 12.0, 1),
 ('edit', 11.626550502067335, 737),
 ('im', 22.173185571490656, 2646),
 ('quality', 89.70370370370371, 49)]

In [70]:
dfEPER.takeOrdered(10, key=lambda x: x[1])

[('bruhs', 0.00130718954248366, 3),
 ('edit3', 0.001455604075691412, 3),
 ('willingly', 0.0014858841010401188, 2),
 ('edit4look', 0.001564945226917058, 1),
 ('flexed', 0.001564945226917058, 1),
 ('httpwwwredditcomwikireddiquette', 0.0016260162601626016, 2),
 ('inflict', 0.002785515320334262, 1),
 ('incessant', 0.002785515320334262, 1),
 ('shitlords', 0.0034482758620689655, 3),
 ('regenerate', 0.0034965034965034965, 2)]

In [None]:
dfFUGE = dfEPER.filter(lambda x: x[2] > 300)
dfFUGE.takeOrdered(10, key=lambda x: -x[1])

a 'modellezes' notebookban folytatodik

collect(): veszi az összes eredményt (ami ügye it elég sok lehet)

In [25]:
#(7) Gyartsunk egy aranyszamot a pozitiv, negativ aranyok modellezesere

In [26]:
#(8) Keressuk ki azon szavakat, amik legalabb 300-szor elofordultak
# es nagyon extrem pozitiv vagy negativ komment statisztikajuk van