In [1]:
from pyspark import SparkConf, SparkContext
conf = SparkConf().setMaster("local").setAppName("Mi programa")
sc = SparkContext(conf = conf)

# Joins

In [16]:
# Creamos dos objetos con una clave(a,b,c,d) y valor
x = sc.parallelize([("a",5), ("b",6), ("c",7),("d",8)])
y = sc.parallelize([("a",1), ("a",2), ("c",3)])

In [17]:
# x añadiendo y pero solo en los valores que tengan la misma clave
x.join(y).collect()

[('c', (7, 3)), ('a', (5, 1)), ('a', (5, 2))]

In [18]:
# y añadiendo x pero solo en los valores que tengan la misma clave
y.join(x).collect()

[('c', (3, 7)), ('a', (1, 5)), ('a', (2, 5))]

In [19]:
# Aca hacemos un left join toma todos los valores de "y" y los pasa a x si hay un valor que esta en x pero no en y no lo pone
y.leftOuterJoin(x).collect()

[('c', (3, 7)), ('a', (1, 5)), ('a', (2, 5))]

In [20]:
# Aca hacemos un left join toma todos los valores de "x" y los pasa a x si hay un valor que esta en y pero no en x no lo pone
x.leftOuterJoin(y).collect()

[('b', (6, None)),
 ('c', (7, 3)),
 ('d', (8, None)),
 ('a', (5, 1)),
 ('a', (5, 2))]

In [21]:
y.rightOuterJoin(x).collect()

[('c', (3, 7)),
 ('b', (None, 6)),
 ('d', (None, 8)),
 ('a', (1, 5)),
 ('a', (2, 5))]

# Acumuladores

In [22]:
lines = sc.textFile("ejemplo.txt")

In [23]:
py = sc.accumulator(0)
sp = sc.accumulator(0)

In [24]:
# Saber si una linea tiene python y spark
def lenguajes(linea):
    global py,sp
    if "Python" in linea:
        py += 1
        if "Spark" in linea:
            sp += 1 
        return True
    if "Spark" in linea:
        sp += 1
        return True
    else:
        return False

In [25]:
valores = lines.filter(lenguajes)

In [26]:
# Nos muestra las filas donde aparece lo que buscamos
valores.collect()

['Apache Spark',
 'Apache SparkSpark Logo',
 'Developer(s)\tApache Spark',
 'Repository\tSpark Repository',
 'Available in\tScala, Java, SQL, Python, R, C#, F#',
 "Apache Spark is an open-source distributed general-purpose cluster-computing framework. Spark provides an interface for programming entire clusters with implicit data parallelism and fault tolerance. Originally developed at the University of California, Berkeley's AMPLab, the Spark codebase was later donated to the Apache Software Foundation, which has maintained it since. "]

In [27]:
# Values son las veces que aparece Python en el texto
py

Accumulator<id=0, value=1>

In [28]:
# Values son las veces que aparece Spark en el texto
sp

Accumulator<id=1, value=5>

#  Funciones map

las funciones map son las que procesan los datos almacenados aplicando filtros y ordenaciones

In [29]:
from pyspark.sql.types import StringType
from pyspark import SQLContext

In [30]:
sqlContext = SQLContext(sc)

In [31]:
dfspark = sqlContext.read.format("csv").option("header", "true").option("inferSchema","true").load("C:/Users/lucho/Desktop/Programacion/Python/Linkedin Data Science/base_datos_2008.csv")
dfspark = dfspark.sample(fraction = 0.001, withReplacement = False)
dfspark = dfspark.withColumn("ArrDelay",dfspark["ArrDelay"].cast("integer"))
dfspark = dfspark.withColumn("DepDelay",dfspark["DepDelay"].cast("integer"))

df2 = dfspark.na.drop(subset=["ArrDelay","DepDelay","Distance"])
df2 = df2.filter("ArrDelay is not null")
df2 = df2.dropDuplicates()

In [32]:
# generamos un array paralelizado
A = sc.parallelize(df2.select("Origin").rdd.collect())

In [33]:
A.persist()

ParallelCollectionRDD[164] at readRDDFromFile at PythonRDD.scala:262

Funciones map

In [34]:
# Le agregamos un 1 a toda la lista de origenes
mapfunction = A.map(lambda x: (x,1))

In [35]:
mapfunction.collect()

[(Row(Origin='BWI'), 1),
 (Row(Origin='GPT'), 1),
 (Row(Origin='HPN'), 1),
 (Row(Origin='LAX'), 1),
 (Row(Origin='MIA'), 1),
 (Row(Origin='ATL'), 1),
 (Row(Origin='DTW'), 1),
 (Row(Origin='GSO'), 1),
 (Row(Origin='TUL'), 1),
 (Row(Origin='ICT'), 1),
 (Row(Origin='LGA'), 1),
 (Row(Origin='ORF'), 1),
 (Row(Origin='PHX'), 1),
 (Row(Origin='CHS'), 1),
 (Row(Origin='SMF'), 1),
 (Row(Origin='STL'), 1),
 (Row(Origin='ORD'), 1),
 (Row(Origin='DAL'), 1),
 (Row(Origin='DAL'), 1),
 (Row(Origin='ATL'), 1),
 (Row(Origin='IND'), 1),
 (Row(Origin='EWR'), 1),
 (Row(Origin='MKE'), 1),
 (Row(Origin='LAS'), 1),
 (Row(Origin='IAH'), 1),
 (Row(Origin='BOS'), 1),
 (Row(Origin='MCO'), 1),
 (Row(Origin='CVG'), 1),
 (Row(Origin='SAT'), 1),
 (Row(Origin='ATL'), 1),
 (Row(Origin='BOS'), 1),
 (Row(Origin='MSP'), 1),
 (Row(Origin='DFW'), 1),
 (Row(Origin='TPA'), 1),
 (Row(Origin='LGB'), 1),
 (Row(Origin='BDL'), 1),
 (Row(Origin='JFK'), 1),
 (Row(Origin='LAX'), 1),
 (Row(Origin='LGA'), 1),
 (Row(Origin='BOS'), 1),


In [39]:
# Agregamos para que ponga distinto valores para dode queramos
def fun(x):
    if x[0] in ["SEA", "ATL", "HOU"]:
        return ((x,2))
    elif x[0] == "DEN":
        return ((x,3))
    else:
        return ((x,1))

In [40]:
mapfunction2 = A.map(fun)

In [41]:
mapfunction2.collect()

[(Row(Origin='BWI'), 1),
 (Row(Origin='GPT'), 1),
 (Row(Origin='HPN'), 1),
 (Row(Origin='LAX'), 1),
 (Row(Origin='MIA'), 1),
 (Row(Origin='ATL'), 2),
 (Row(Origin='DTW'), 1),
 (Row(Origin='GSO'), 1),
 (Row(Origin='TUL'), 1),
 (Row(Origin='ICT'), 1),
 (Row(Origin='LGA'), 1),
 (Row(Origin='ORF'), 1),
 (Row(Origin='PHX'), 1),
 (Row(Origin='CHS'), 1),
 (Row(Origin='SMF'), 1),
 (Row(Origin='STL'), 1),
 (Row(Origin='ORD'), 1),
 (Row(Origin='DAL'), 1),
 (Row(Origin='DAL'), 1),
 (Row(Origin='ATL'), 2),
 (Row(Origin='IND'), 1),
 (Row(Origin='EWR'), 1),
 (Row(Origin='MKE'), 1),
 (Row(Origin='LAS'), 1),
 (Row(Origin='IAH'), 1),
 (Row(Origin='BOS'), 1),
 (Row(Origin='MCO'), 1),
 (Row(Origin='CVG'), 1),
 (Row(Origin='SAT'), 1),
 (Row(Origin='ATL'), 2),
 (Row(Origin='BOS'), 1),
 (Row(Origin='MSP'), 1),
 (Row(Origin='DFW'), 1),
 (Row(Origin='TPA'), 1),
 (Row(Origin='LGB'), 1),
 (Row(Origin='BDL'), 1),
 (Row(Origin='JFK'), 1),
 (Row(Origin='LAX'), 1),
 (Row(Origin='LGA'), 1),
 (Row(Origin='BOS'), 1),


# Funciones reduce

Vamos a usar las funciones map de arriba

In [45]:
# Nos da las veces que se repite cada aeropuerto
reducefunction = mapfunction.reduceByKey(lambda x,y: x+y)

In [46]:
reducefunction.collect()

[(Row(Origin='BWI'), 118),
 (Row(Origin='GPT'), 11),
 (Row(Origin='HPN'), 9),
 (Row(Origin='LAX'), 208),
 (Row(Origin='MIA'), 58),
 (Row(Origin='ATL'), 398),
 (Row(Origin='DTW'), 150),
 (Row(Origin='GSO'), 11),
 (Row(Origin='TUL'), 22),
 (Row(Origin='ICT'), 21),
 (Row(Origin='LGA'), 118),
 (Row(Origin='ORF'), 14),
 (Row(Origin='PHX'), 198),
 (Row(Origin='CHS'), 18),
 (Row(Origin='SMF'), 48),
 (Row(Origin='STL'), 75),
 (Row(Origin='ORD'), 309),
 (Row(Origin='DAL'), 46),
 (Row(Origin='IND'), 47),
 (Row(Origin='EWR'), 140),
 (Row(Origin='MKE'), 52),
 (Row(Origin='LAS'), 177),
 (Row(Origin='IAH'), 187),
 (Row(Origin='BOS'), 117),
 (Row(Origin='MCO'), 129),
 (Row(Origin='CVG'), 90),
 (Row(Origin='SAT'), 63),
 (Row(Origin='MSP'), 127),
 (Row(Origin='DFW'), 238),
 (Row(Origin='TPA'), 78),
 (Row(Origin='LGB'), 15),
 (Row(Origin='BDL'), 31),
 (Row(Origin='JFK'), 118),
 (Row(Origin='CLT'), 125),
 (Row(Origin='ONT'), 37),
 (Row(Origin='FAR'), 13),
 (Row(Origin='PVD'), 28),
 (Row(Origin='HOU'), 40

Aca lo hacemos con el otro dataset

In [49]:
reducefunction2 = mapfunction2.reduceByKey(lambda x,y: x+y)

In [50]:
reducefunction2.collect()

[(Row(Origin='BWI'), 118),
 (Row(Origin='GPT'), 11),
 (Row(Origin='HPN'), 9),
 (Row(Origin='LAX'), 208),
 (Row(Origin='MIA'), 58),
 (Row(Origin='ATL'), 796),
 (Row(Origin='DTW'), 150),
 (Row(Origin='GSO'), 11),
 (Row(Origin='TUL'), 22),
 (Row(Origin='ICT'), 21),
 (Row(Origin='LGA'), 118),
 (Row(Origin='ORF'), 14),
 (Row(Origin='PHX'), 198),
 (Row(Origin='CHS'), 18),
 (Row(Origin='SMF'), 48),
 (Row(Origin='STL'), 75),
 (Row(Origin='ORD'), 309),
 (Row(Origin='DAL'), 46),
 (Row(Origin='IND'), 47),
 (Row(Origin='EWR'), 140),
 (Row(Origin='MKE'), 52),
 (Row(Origin='LAS'), 177),
 (Row(Origin='IAH'), 187),
 (Row(Origin='BOS'), 117),
 (Row(Origin='MCO'), 129),
 (Row(Origin='CVG'), 90),
 (Row(Origin='SAT'), 63),
 (Row(Origin='MSP'), 127),
 (Row(Origin='DFW'), 238),
 (Row(Origin='TPA'), 78),
 (Row(Origin='LGB'), 15),
 (Row(Origin='BDL'), 31),
 (Row(Origin='JFK'), 118),
 (Row(Origin='CLT'), 125),
 (Row(Origin='ONT'), 37),
 (Row(Origin='FAR'), 13),
 (Row(Origin='PVD'), 28),
 (Row(Origin='HOU'), 80

In [54]:
# Mostramos en orden del aeropuerto
reducefunction.sortByKey().take(10)

[(Row(Origin='ABE'), 4),
 (Row(Origin='ABI'), 3),
 (Row(Origin='ABQ'), 54),
 (Row(Origin='ACK'), 1),
 (Row(Origin='ACT'), 1),
 (Row(Origin='ACV'), 8),
 (Row(Origin='AEX'), 2),
 (Row(Origin='AGS'), 7),
 (Row(Origin='ALB'), 12),
 (Row(Origin='AMA'), 9)]

In [57]:
# Nos muestra los aeropuertos con mas salidas en orden descendente
reducefunction.sortBy(lambda x:x[1], ascending = False).take(10)

[(Row(Origin='ATL'), 398),
 (Row(Origin='ORD'), 309),
 (Row(Origin='DEN'), 240),
 (Row(Origin='DFW'), 238),
 (Row(Origin='LAX'), 208),
 (Row(Origin='PHX'), 198),
 (Row(Origin='IAH'), 187),
 (Row(Origin='LAS'), 177),
 (Row(Origin='DTW'), 150),
 (Row(Origin='EWR'), 140)]

Si lo hacemos con la otra dataframe nos da los valores que tiene 2 el doble como ATL

In [58]:
reducefunction2.sortBy(lambda x:x[1], ascending = False).take(10)

[(Row(Origin='ATL'), 796),
 (Row(Origin='DEN'), 720),
 (Row(Origin='ORD'), 309),
 (Row(Origin='DFW'), 238),
 (Row(Origin='SEA'), 222),
 (Row(Origin='LAX'), 208),
 (Row(Origin='PHX'), 198),
 (Row(Origin='IAH'), 187),
 (Row(Origin='LAS'), 177),
 (Row(Origin='DTW'), 150)]

# Ejemplos basico de mapReduce

Vamos a usar el archivo lines que es de la wikipedia 

In [59]:
lines.getNumPartitions()

1

In [60]:
py = sc.accumulator(0)
sp = sc.accumulator(0)
# Saber si una linea tiene python y spark
def lenguajes(linea):
    global py,sp
    if "Python" in linea:
        py += 1
        if "Spark" in linea:
            sp += 1 
        return True
    if "Spark" in linea:
        sp += 1
        return True
    else:
        return False
    
    
valores = lines.filter(lenguajes)

In [61]:
valores.count()

6

In [62]:
py

Accumulator<id=2, value=1>

In [63]:
sp

Accumulator<id=3, value=5>

In [69]:
# Nos cuenta todos los valores que hay
def lenguajes_map(x):
    if "Python" in x and "Spark" in x:
        return("Count",(1,1))
    elif "Spark" in x:
        return("Count",(0,1))
    elif "Python" in x:
        return("Count",(1,0))
    else:
        return("Count", (0,0))

mapfun = lines.map(lenguajes_map)

In [70]:
mapfun.count()

19

In [68]:
mapfun.reduceByKey(lambda x,y: (x[0]+y[0], x[1]+y[1])).collect()

[('Count', (1, 5))]