<div align="center">

[![Open In Colab](https://colab.research.google.com/assets/colab-badge.png)](https://colab.research.google.com/github/wisaaco/AA_DistributedSystems_Lab/blob/main/U9-Spark/SparkExamples.ipynb)

Si no funciona el botó podeu copiar el següent [enllaç](https://colab.research.google.com/github/wisaaco/AA_DistributedSystems_Lab/blob/main/U9-Spark/SparkExamples.ipynb)

</div>

References:
- https://spark.apache.org/docs/latest/api/python/reference/index.html

In [None]:
!apt-get install openjdk-17-jdk-headless -qq > /dev/null
!wget -q https://dlcdn.apache.org/spark/spark-3.5.7/spark-3.5.7-bin-hadoop3.tgz
!tar xf spark-3.5.7-bin-hadoop3.tgz

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-17-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.7-bin-hadoop3"
os.environ["PATH"] += os.pathsep + os.path.join(os.environ["SPARK_HOME"], "bin")

In [None]:
from pyspark import SparkConf, SparkContext
conf = SparkConf().setMaster("local").setAppName("My App")
# "local" un solo hilo
# "local[2]" 2 hilos
# "local[*]" tantos hilos como cores tiene la máquina
#conf.close()

In [None]:
sc = SparkContext.getOrCreate(conf=conf)
# Si ya existe un SparkContext, no se crea uno nuevo
#sc.close()

In [None]:
# data = sc.textFile("sample_data/README.md")
data = sc.textFile("sample_data/README.md")

In [None]:
type(data) # Resilient Distributed Dataset

In [None]:
len(data) #!

In [None]:
for line in data:
    print(line) #!

In [None]:
for line in data.collect():
  print(line)

In [None]:
data2 = sc.parallelize(["pandas", "i like pandas"])
type(data2)

In [None]:
len(data2)

# RDD
A **Resilient Distributed Dataset (RDD)**, the basic abstraction in Spark.


- https://spark.apache.org/docs/latest/rdd-programming-guide.html

## Operations
- **Actions** are operations that give non-RDD values. The values of action are stored to drivers or to the external storage system. It brings laziness of RDD into motion.<br/>
```python
lines.count() -> int
```

- **Transformations** are functions that produces new RDD from the existing RDDs. It takes RDD as input and produces one or more RDD as output. They are operations on RDDs that return a new RDD. As discussed in “Lazy Evaluation” , transformed RDDs are computed lazily, only when you use them in an action. Many transformations are element-wise; that is, they work on one element at a time; but this is not true for all transformations.

**Transformations** : Filter()

In [None]:
inputRDD = sc.textFile("sample_data/README.md")
samplesRDD = inputRDD.filter(lambda x: "sample" in x)
type(samplesRDD)

## Actions
They are the operations that return a final value to the driver program or write data to an external storage system. Actions force the evaluation of the transformations required for the RDD they were called on, since they need to actually produce output

- collect()
- count()
- countByValue()
- take(num)
- top(num)
- takeOrdered(num)(ordering)
- takeSample(...)
- reduce()
- fold
- aggregate
- foreach


In [None]:
for line in inputRDD.collect():
  print(line)
  break

In [None]:
print("Total Input: %i "%samplesRDD.count())


In [None]:
for line in samplesRDD.take(3):
  print(line)

In [None]:
samplesRDD.top(2)

In [None]:
nums = sc.parallelize([1, 2, 2, 2])
nums.countByValue()

**reduce()**, which takes a function that operates on two elements of the type in your RDD and returns a new element of the same type.

In [None]:
nums = sc.parallelize([1, 2, 3, 4])
sum = nums.reduce(lambda x, y: x + y)
print(sum)

takes a neutral “zero value” to be used for the initial call on each partition.

In [None]:
nums = sc.parallelize([1, 2, 3, 4])
print(nums.getNumPartitions())
nums = sc.parallelize([1, 2, 3, 4],4)
print(nums.getNumPartitions())

Inicio: **1**
```
(1 + 1) = 2
(2 + 2) = 4
(4 + 3) = 7
(7 + 4) = 11
````

Resultado final = 11 + 1 (zeroValue final) = 12

In [None]:
nums = sc.parallelize([1, 2, 3, 4],2)
sum = nums.fold(1,lambda x, y: x + y)
print(sum)

(1+1+2) + (1+3+4) + 1

aggregate() function frees us from the constraint of having the return be the same type as the RDD we are working on.<br/>
https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.aggregate.html

In [None]:
nums = sc.parallelize([1, 2, 3, 4],2)

seqOp = lambda acc, num: (acc[0] + num, acc[1] + 1) # a function used to accumulate results within a partition
combOp = lambda partition_1, partition_2: (partition_1[0] + partition_2[0], partition_1[1] + partition_2[1]) #an associative function used to combine results from different partitions

sumCount = nums.aggregate((0, 0),seqOp,combOp) #ZeroValue,Inside the partition, Combining partitions

print(type(sumCount))
print(len(sumCount))
print(sumCount)
print("---")
sc.parallelize([]).aggregate((1, 0), seqOp, combOp)

 We could have:
- partition 1: [1,2] 
- partition 2: [3,4]

SeqOp:
- partition 1:
```
  seqOp((0,0),1) = (0+1,0+1) = (1,1)
  seqOp((1,1),2) = (1+2,1+1) = (3,2)
```
- partition 2: 
```
  seqOp((0,0),3) = (0+3,0+1) = (3,1)
  seqOp((3,1),4) = (3+4,1+1) = (7,2)
```

Combop: 
````
combOp((3,2),(7,2)) = (3+7, 2+2) = (10,4)
````

Reflexión

In [None]:
rdd = sc.parallelize([1, 2, 3, 4, 5 ,6, 7, 8, 9, 10], 3) #3 partitions
rdd.glom().collect() #Glom. Return an RDD created by coalescing all elements within each partition into a list.


In [None]:
seqOp = lambda acc, num: (acc[0] * num, acc[1] + 1) 
combOp = lambda partition_1, partition_2: (partition_1[0] + partition_2[0], partition_1[1] + partition_2[1]) #an associative function used to combine results from different partitions
unk_op = rdd.aggregate(
  (0, -1), seqOp, combOp
)
# result = ?

In [None]:
unk_op

### Actividad 

In [None]:
#A. Compute [min,max,len] of:
ages = [3,23,42,12,34,50,19,97,1,94,35,65,87]

#TIP:
import math
print(math.inf)

In [None]:
#B. Compute [sum(sales > 300$),len(sales >300)]
sales = [300,200,1094,390,29,320,90,10029]

## Transformations
Narrow Transformation:
- map
- flatMap
- MapPartition
- Filter
- Sample
- Union

In [None]:
nums = sc.parallelize([1, 2, 3, 4])
squared = nums.map(lambda x: x * x)
print(squared.collect())


In [None]:
lines = sc.parallelize(["hello world", "hi Pepe, how are you?"])
words = lines.flatMap(lambda line: line.split(" "))
print(words.collect())


In [None]:
# Return a new RDD by applying a function to each partition of this RDD.
rdd = sc.parallelize([(1, 2), (3, 4), (5, 6), (7, 8)], 2)
print(rdd.glom().collect() )
def average_partition(iterator):
     x_sum = 0
     y_sum = 0
     count = 0
     for (x, y) in iterator:
         x_sum += x
         y_sum += y
         count += 1
     yield (x_sum/count, y_sum/count)

avg_rdd = rdd.mapPartitions(average_partition)
print(avg_rdd.collect())


In [None]:
lines = sc.parallelize(["hello world", "hi Pepe, who are you?"])
lines.filter(lambda x: "wo" in x).collect()


In [None]:
# https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.sample.html
rdd = sc.parallelize([1,2,3,4,5,6,7,8,9,10])
rdd.sample(False,0.5,seed=1).collect()


Union(), disctint(), intersection(), subtract(), cartesian()

In [None]:
lines = sc.parallelize(["hello world", "hi Pepe, who are you?"])
rdd = sc.parallelize([1,2,3,4,5,6,7,8,9,10])
uRDD = rdd.union(lines)
uRDD.collect()


### Actividad

In [None]:
# C. Count the number of words in the file after transforming to lowercase and removing words with less than 4 characters; and count the number of discting words.
#TIP: map, flatmap, filter, 
# distinct().count()

# **Pair RDDs**

Pair RDDs are a useful building block in many programs, as they expose operations that allow you to act on each key in parallel or regroup data across the network. For example, pair RDDs have a reduceByKey() method that can aggregate data separately for each key, and a join() method that can merge two RDDs together by grouping elements with the same key.


Wide transformation:

- keys()
- values()
- reduceByKey(func)
- groupByKey()
- combineByKey(...)
- mapValues(func)
- flatMapValues(func)
- sortByKey()
- countByKey()
- collectAsMap()
- lookup(key)



In [None]:
lines = sc.textFile("sample_data/README.md")


In [None]:
pairs = lines.map(lambda x: (x.split(" ")[0], x))
pairs.take(1)

In [None]:
pairs.keys().count()

In [None]:
pairs.collect()

In [None]:
pairs.keys().collect()

In [None]:
pairs.keys().distinct().count()

In [None]:
## Reflexion
words = lines.flatMap(lambda x: x.split(" "))
result = words.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y)

In [None]:
for kv in result.sortByKey().collect():
  print(kv)

**Join** operator is an inner join.

In [None]:
data1 = [("a", 3), ("b", 4), ("a", 1)]
data2 = [("a", 5), ("b", 1), ("c", 1)]
d1 = sc.parallelize(data1)
d2 = sc.parallelize(data2)

In [None]:
for kv in d1.join(d2).collect():
  print(kv)

leftOuterJoin(other) ,  rightOuterJoin(other)

In [None]:
for kv in d1.leftOuterJoin(d2).collect():
  print(kv)

In [None]:
for kv in d1.rightOuterJoin(d2).collect():
  print(kv)

**Lookup**

In [None]:
l = range(100)
l1 = range(1,101)
rdd = sc.parallelize(zip(l, l1), 10)
print(rdd.take(5))
rdd.lookup(42)  # slow




In [None]:
sorted = rdd.sortByKey()
sorted.lookup(42)  # fast


In [None]:
import os 
os.environ["PYTHONHASHSEED"]="1" #Spark sessions: coherencia entre tipos 

rdd2 = sc.parallelize([(('a', 'b'), 'c')]).groupByKey()
list(rdd2.lookup(('a', 'b'))[0])

# Example: Page Rank Algorithm Implementation

$$PageRank(A) = \frac{(1 - d)}{N} + d * \sum_{B\in in(A)} \frac{PageRank(B)}{L(B)}$$


Donde:

- A y B son páginas
- `PageRank(A)` es el valor de PageRank para la página A.
- `d` es el factor de amortiguación (generalmente se establece en 0.85 en la práctica).
- `N` es el número total de páginas en la red.
- `Σ` representa la suma sobre todas las páginas B que enlazan a la página A.
- in(A) es el conjunto de páginas que enlazan a la página A.
- `PageRank(B)` es el valor de PageRank de la página B.
- `L(B)` es el número de enlaces salientes desde la página B.


Supongamos que tenemos cuatro páginas web (A, B, C y D) en una red y que inicialmente todas tienen un PageRank igual. El factor de amortiguación (d) es 0.85.

Relaciones:

- A <- B
- B <- A, C
- C <- B
- D <- B

Iteraciones:

* Iteración 0 (valores iniciales):



PageRank(A) = PageRank(B) = PageRank(C) = PageRank(D) = 0.25

* Iteración 1:


\begin{align*}
PageRank(A) & = \frac{(1 - 0.85)}{4} + 0.85 \cdot \frac{PageRank(B)}{1} \\
& = 0.0375 + 0.85 \cdot 0.25 = 0.2875
\end{align*}

\begin{align*}
PageRank(B) & = \frac{(1 - 0.85)}{4} + 0.85 \cdot \left(\frac{PageRank(A)}{1} + \frac{PageRank(C)}{1}\right) \\
& = 0.0375 + 0.85 \cdot (0.2875 + 0.25) = 0.675
\end{align*}

\begin{align*}
PageRank(C) & = \frac{(1 - 0.85)}{4} + 0.85 \cdot \frac{PageRank(B)}{1} \\
& = 0.0375 + 0.85 \cdot 0.675 = 0.6025
\end{align*}


\begin{align*}
PageRank(D) & = \frac{(1 - 0.85)}{4} + 0.85 \cdot \frac{PageRank(B)}{1} \\
& = 0.0375 + 0.85 \cdot 0.675 = 0.6025
\end{align*}



In [None]:
# REF: https://github.com/Proxy08/PageRank/blob/main/pagerank-spark.py

In [None]:
import re

def computeContribs(urls, rank):
    """Calculates URL contributions to the rank of other URLs."""
    num_urls = len(urls)
    for url in urls:
        yield (url, rank / num_urls)


def parseNeighbors(urls):
    """Parses a urls pair string into urls pair."""
    parts = re.split(r'\s+', urls)
    return parts[0], parts[1]

In [None]:
from pyspark.sql import SparkSession


spark = SparkSession\
    .builder\
    .appName("PageRank")\
    .getOrCreate()

lines = spark.read.text("pageRank_data.txt")
lines = lines.rdd.map(lambda r: r[0])
for i in lines.collect():
  print(i)

In [None]:
lines = lines.rdd.map(lambda r: r[0])

In [None]:
for i in lines.collect():
  print(i)

In [None]:

# Loads all URLs from input file and initialize their neighbors.
links = lines.map(lambda urls: parseNeighbors(urls)).distinct().groupByKey().cache()


In [None]:
for i in links.collect():
  print(i[0])
  for j in i[1]:
    print("\t",j)

In [None]:

# Loads all URLs with other URL(s) link to from input file and initialize ranks of them to one.
ranks = links.map(lambda url_neighbors: (url_neighbors[0], 1.0))

In [None]:
for i in ranks.collect():
  print(i[0],i[1])


In [None]:
t = links.join(ranks)
t.take(1)

In [None]:
contribs = links.join(ranks).flatMap(lambda url_urls_rank: computeContribs(
    url_urls_rank[1][0], url_urls_rank[1][1]  # type: ignore[arg-type]
))

In [None]:
a = links.join(ranks).flatMap(lambda url_urls_rank: (url_urls_rank[1][0], url_urls_rank[1][1]))


In [None]:
for i in a.collect():
 if (type(i)!=float):
  for x in i:
    print(x)
 else:
  print(i)


In [None]:
contribs.take(1)

In [None]:
for i in contribs.collect():
  print(i)

In [None]:
ranks = contribs.reduceByKey(add).mapValues(lambda rank: rank * 0.85 + 0.15)
for i in ranks.collect():
  print(i)

In [None]:
from operator import add

# Calculates and updates URL ranks continuously using PageRank algorithm.
for iteration in range(5):
    # Calculates URL contributions to the rank of other URLs.
    contribs = links.join(ranks).flatMap(lambda url_urls_rank: computeContribs(
        url_urls_rank[1][0], url_urls_rank[1][1]  # type: ignore[arg-type]
    ))

    # Re-calculates URL ranks based on neighbor contributions.
    ranks = contribs.reduceByKey(add).mapValues(lambda rank: rank * 0.85 + 0.15)

# Collects all URL ranks and dump them to console.
for (link, rank) in ranks.collect():
    print("%s has rank: %s." % (link, rank))



# Actividad

In [None]:
# D. Keys and values:
products =[("manzana", 1), ("banana", 2), ("manzana", 3), ("pera", 2),("pera", 8)]
# discover keys, values and counts them


In [None]:
# E. Sum the prices of each product:
products =[("manzana", 1), ("banana", 2), ("manzana", 3), ("pera", 2),("pera", 8)]
# TIP: reducebykey, groupbykey,

In [None]:
# F. Compute the average of each group:
groups = [("A", 10), ("A", 20), ("B", 5), ("B", 15)]
#TIP: mappartition, reducebykey, map


# Actividad

In [None]:
!pip install faker faker-commerce

Tenemos un archivo de transacciones
```csv
order_id, product_id, quantity, price, status
1, 101, 2, 15.99, COMPLETE
2, 103, 1, 29.99, PENDING
3, 101, 1, 15.99, CANCELED
...
```

llamado `orders.csv`

In [None]:
import pandas as pd
import numpy as np
np.random.seed(1)
size = 1000
states = ["COMPLETE","PENDING","CANCELED"]
product_id = np.random.choice(np.random.randint(200,10090,size=500),size)
quantity = np.random.randint(1,10,size)
price = np.random.randint(1,3000,size)
status = np.random.choice(states,size,p=[0.7,0.25,0.05])
pd.DataFrame({"order_id":range(size),"product_id":product_id, "quantity":quantity, "price":price, "status":status}).to_csv("orders.csv")

Y un archivo de productos, llamado `products.csv`:

```csv
product_id, product_name, category, company
101, Widget A, Electronics, Leach-Smith
103, Widget B, Home, Hopkins Inc
...
```

In [None]:
from faker import Faker
import faker_commerce
Faker.seed(1)

u_product= np.unique(product_id)
size = len(u_product)
print(size)

# category= ["Automative","Home","Electronics","Retail","Health Care","Books","Media"]
fake = Faker()
fake.add_provider(faker_commerce.Provider)
    
pd.DataFrame({"product_id":u_product, 
              "product_name":[fake.name() for _ in range(size)], 
              "category":[fake.ecommerce_category() for _ in range(size)], 
              "company":[fake.company() for _ in range(size)], 
              }).to_csv("products.csv")

### Tareas
- A. Combinar ambos ficheros 
- B. Controlar la facturación:
  - a. Contar la cantidad de pedidos en cada estado (COMPLETE, CANCELED, PENDING).
  - b. Total de Ingresos por Producto (Filtrar solo los pedidos en estado COMPLETE) y calcular el ingreso total para cada product_id multiplicando quantity por price. Mapea los resultados a una estructura con el nombre del producto y la categoría.
  

In [None]:
# TODO