<h1><center>Προχωρημένα Θέματα Βάσεων Δεδομένων</center></h1>
<h2><center>Εξαμηνιαία Εργασία</center></h2>
<hr>

| Ονοματεπώνυμο      | Αριθμός Μητρώου |
| ------------------ | --------------- |
| Δοντάς Σπυρίδων    | 03114141        |
| Τσιούρβας Αστέριος | 03114133        |

<hr>
<h1><center>Προεργασία</center></h1>

In [1]:
import datetime as dt
import pandas as pd
from operator import add
from functools import partial

from IPython.display import display, HTML

<hr>
<h1><center>Ερώτημα 1</center></h1>

Σε αυτό το ερώτημα κληθήκαμε να υπολογίσουμε τα εξής:
* το μέσο διάστημα διαδρομής ανά ώρα εκκίνησης διαδρομής
* το μέγιστο ποσό που πληρώθηκε για κάθε εταιρεία

### Διάβασμα δεδομένων σε `RDDs`

In [2]:
vendors = sc.textFile("/Project/yellow_tripvendors_1m.csv")
data = sc.textFile("/Project/yellow_tripdata_1m.csv")

### Συνάρτηση για δημιουργία `key - value`

In [3]:
def start_duration(line):
    """Function to map over RDD lines.
    
    Produces the necessary tuples for key - value processing.
    The starttime of the trip becomes the key, while the duration
    of the trip becomes the value.
    
    Parameters
    ----------
    
    line: string
        CSV delimited string from RDD.
        
    Returns
    -------
    
    keyval: tuple of (string, float)
        Key   -> starttime
        Value -> duration
    """
    contents = line.split(",")
    start, end = contents[1], contents[2]
    starttime = dt.datetime.strptime(start, "%Y-%m-%d %H:%M:%S")
    endtime = dt.datetime.strptime(end, "%Y-%m-%d %H:%M:%S")
    diff = endtime - starttime
    diff = diff.total_seconds() / 60
    return "{:02d}".format(starttime.hour), diff

### Map
Με τη μέθοδο `map` δημιουργούμε ένα νέο `RDD` από `tuples` τις οποίες το `Spark` αντιμετωπίζει ως ζευγάρι `key - value`. Το παραπάνω θα ισχύει για όλες τις επόμενες χρήσεις μιας αρχικής κλήσης `map` πάνω στα αρχικά δεδομένα.
***
Συγκεκριμένα, εδώ θα έχουμε ως `key` την ώρα εκκίνησης της διαδρομής και ως `value` τη διάρκεια της διαδρομής. 

In [4]:
duration = data.map(start_duration)

### Εύρεση μέσης διάρκειας διαδρομής ανά ώρα εκκίνησης
* μετατρέπουμε κάθε τιμή σε `tuple` της μορφής `(value, 1)`
* εφαρμόζουμε `reduceByKey` με μια συνάρτηση η οποία αθροίζει τα στοιχεία κάθε τούπλας
  * σαν αποτέλεσμα για κάθε κλειδί θα έχουμε μία `tuple` της μορφής `(sum(values), num(values))`
* εφαρμόζουμε για κάθε τιμή (που είναι της μορφής `(sum, len)`) τη συνάρτηση `sum / num` (η οποία παράγει το μέσο όρο)

In [5]:
duration = duration.mapValues(lambda x: (x, 1))\
                   .reduceByKey(
                        lambda a, b: tuple(map(sum, zip(a, b)))
                    )\
                   .mapValues(lambda x: x[0] / x[1])

### Αποθήκευση αρχείου στο `HDFS`
* μετατρέπουμε το `RDD` σε `PySpark DataFrame` με `headers`
* μαζεύουμε σε ένα `partition`
* αποθηκεύουμε ως `csv`

In [6]:
df = duration.toDF(["HourOfDay", "AverageTripDuration"])\
             .sort("HourOfDay")
df.coalesce(1)\
  .write\
  .csv("/Project/TripDuration", header="true", mode="overwrite")

### Εμφάνιση αποτελεσμάτων

In [7]:
display(HTML(df.toPandas().to_html(index=False)))

HourOfDay,AverageTripDuration
0,14.017794
1,13.97507
2,13.035636
3,13.322283
4,13.799858
5,13.275583
6,12.48742
7,13.395006
8,14.627505
9,14.670106


### Συνάρτηση για δημιουργία `key - value`

In [8]:
def id_price(line):
    """Function to map over RDD lines.
    
    Produces the necessary tuples for key - value processing.
    The id of the trip becomes the key, while the price of
    the trip becomes the value.
    
    Parameters
    ----------
    
    line: string
        CSV delimited string from RDD.
        
    Returns
    -------
    
    idprice: tuple of (string, float)
        Key   -> id of trip
        Value -> price of trip
    """
    contents = line.split(",")
    return contents[0], float(contents[7])

### Map
Εφαρμόζουμε `map` τόσο στο `vendors RDD` όσο και στο `data RDD` ως εξής:
* για το `vendors` δημιουργούμε ένα νέο `RDD` με `key` το id της διαδρομής και `value` το id της εταιρείας
* για το `data` δημιουργούμε ένα νέο `RDD` με `key` το id της διαδρομής και `value` το ποσό που πληρώθηκε για τη διαδρομή

In [9]:
parts = 100
vendors = vendors.map(lambda x: tuple(x.split(",")))\
                 .repartition(parts)
prices = data.map(id_price)\
             .repartition(parts)

### Inner Join
Ενώνουμε τα δύο `RDDs` σε ένα, εφαρμόζοντας inner join πάνω στο id της διαδρομής. Το παραγόμενο `RDD` αποτελείται από `tuples` όπου το κλειδί είναι το id της διαδρομής και η τιμή είναι ένα `tuple` με το id της εταιρείας και το κόστος της διαδρομής.

In [10]:
prices = vendors.join(prices, numPartitions=parts)

### Εύρεση μέγιστης τιμής ανά εταιρεία
* εφαρμόζουμε `map` στο `RDD` για δημιουργία `key - value` με `key` το id της εταιρείας και `value` την τιμή της διαδρομής (το id της διαδρομής πλέον δε μας ενδιαφέρει)
* εφαρμόζουμε `reduceByKey` με τη συνάρτηση `max` για να βρούμε το μέγιστο για κάθε εταιρεία

In [11]:
prices = prices.map(lambda x: (x[1][0], x[1][1]))\
               .reduceByKey(max)

### Αποθήκευση αρχείου στο `HDFS`
* μετατρέπουμε το `RDD` σε `PySpark DataFrame` με `headers`
* μαζεύουμε σε ένα `partition`
* αποθηκεύουμε ως `csv`

In [12]:
df = prices.toDF(["VendorID", "MaxAmountPaid"])\
           .sort("VendorID")
df.coalesce(1)\
  .write\
  .csv("/Project/MaxAmountPaid", header="true", mode="overwrite")

### Εμφάνιση αποτελεσμάτων

In [13]:
display(HTML(df.toPandas().to_html(index=False)))

VendorID,MaxAmountPaid
1,503326.33
2,548463.35


<hr>
<h1><center>Ερώτημα 2</center></h1>

Σε αυτό το ερώτημα θα πρέπει εφαρμόζοντας τον αλγόριθμο `k-Means` να βρούμε τα κέντρα των top 5 περιοχών επιβίβασης.

In [14]:
sqlContext.clearCache()

### Ορισμός ευκλείδιας απόστασης

In [15]:
def euclidean(p, q):
    """Calculate euclidean distance.
    
    Formula:
    .. math::
        \sqrt{\sum (p_i - q_i)^2}
        
    Parameters
    ----------
    
    p: iterable of numbers
        Contains the coordinates of the first point.
        
    q: iterable of numbers
        Contains the coordinates of the second point.
        
    Returns
    -------
    
    euclidean: float
    """
    sum = 0
    for pi, qi in zip(p, q):
        sum += (pi - qi) ** 2
    return sum ** (1 / 2)

### Συνάρτηση για δημιουργία συντεταγμένων εκκίνησης

In [16]:
def coordinates(line):
    """Function to map over RDD lines.
    
    Produces the necessary Coordinates tuples.
    
    Parameters
    ----------
    
    line: string
        CSV delimited string from RDD.
        
    Returns
    -------
    
    coordinates: tuple of numbers
    """
    contents = line.split(",")
    lng, lat = map(float, contents[3:5])
    return lng, lat

### Αρχικοποιήσεις
* Εφαρμόζουμε `map` στο `data RDD` δημιουργώντας το `RDD population` αποτελούμενο από συντεταγμένες εκκίνησης σε `tuple`.
* `k = 5`, ο αριθμός των top κέντρων που θέλουμε να βρούμε.
* `iterations = 3`, ο αλγόριθμος θα κάνει 3 επαναλήψεις.
* Βρίσκουμε την αρχική περίπτωση `centroids`, τα οποία θεωρούμε ότι είναι τα πρώτα 5 στο `RDD population`.

In [17]:
population = data.map(coordinates)
k = 5
iterations = 3
centroids = [(i, c) for i, c in enumerate(population.take(k), 1)]

Η παραπάνω αρχικοποίηση των `centroids` θα μπορούσε να είχε γραφεί:
```python
centroids = list(enumerate(population.take(k), 1))
```

### Συνάρτηση για εύρεση κοντινότερου κέντρου

In [18]:
def closest(centroids, coordinates):
    """Find closest centroid.
    
    Computes the minimum euclidean distance
    and saves centroid id.
    
    Parameters
    ----------
    
    centroids: iterable
        Contains the centroids as tuples of id, coordinates.
    
    coords: tuple of floats
        Tuple contains the coordinates.
        
    Returns
    -------
    
    id_coords: tuple of int, tuple
        Tuple containing the id of the closest centroid and
        the tuple of coordinates of this point.
    """
    distance = min(
        (
            (centroid[0], euclidean(coordinates, centroid[1])) 
            for centroid in centroids
        ),
        key=lambda x: x[1]
    )
    return (distance[0], coordinates)

### Συνάρτηση για άθροισμα εμφολευμένων `iterable`

In [19]:
def sum_by_elem(p, q):
    """Function to reduce over RDD values.
    
    Gets two elements and sums each coordinate.
    
    Parameters
    ----------
    
    p: tuple of tuple, int
        Contains tuple of floats and an int
    
    q: tuple of tuple, int
        Contains tuple of floats and an int
    
    Returns
    -------
    
    sum_by_elem: tuple of tuple, int
        Tuple of floats, int
    """
    p, num1 = p
    q, num2 = q
    return (tuple(map(sum, zip(p, q))), num1 + num2)

### Συνάρτηση για εύρεση μέσου όρου ανά στοιχείο

In [20]:
def avg_by_elem(p):
    """Function to reduce over RDD values.
    
    Gets an element and averages each coordinate.
    
    Parameters
    ----------
    
    p: tuple of tuple, int
        Contains a tuple with summed by element values, sum of ints
    
    Returns
    -------
    
    avg_by_elem: tuple of floats
        For each tuple element divide with number of elements
    """
    p, num = p
    return tuple(map(lambda x: x / num, p))

### Κύριος Αλγόριθμος
Σε κάθε iteration του αλγορίθμου ακολουθούμε τα εξής βήματα:
* εφαρμόζουμε `map` για να δημιουργήσουμε ένα `RDD` με τις συντεταγμένες κάθε σημείου και ένα αντίγραφο των κέντρων
* εφαρμόζουμε `map` με τη συνάρτηση `closest` στο παραπάνω `RDD` για να δημιουργήσουμε ένα `RDD` με `key` το id του κοντινότερου cluster και `value` τις συντεταγμένες του σημείου
* εφαρμόζουμε `mapValues`-`reduceByKey`-`mapValues` για την εύρεση των νέων κέντρων με βάση τους μέσους όρους ανά συντεταγμένη (η διαδικασία είναι παρόμοια με την εύρεση του μέσου όρου στο ερώτημα 1 με τη διαφορά ότι εδώ βρίσκουμε μέσο όρο σε πολλά στοιχεία ανά στοιχείο)

In [21]:
for _ in range(iterations):
    pclosest = partial(closest, centroids)
    points_labels = population.map(pclosest)
    new_centroids = \
        points_labels.mapValues(lambda x: (x, 1))\
                     .reduceByKey(sum_by_elem)\
                     .mapValues(avg_by_elem)
    centroids = new_centroids.collect()

### Αποθήκευση αρχείου στο `HDFS`
* μετατρέπουμε το `RDD` σε `PySpark DataFrame` με `headers`
* μαζεύουμε σε ένα `partition`
* αποθηκεύουμε ως `csv`

In [22]:
centroids = new_centroids.toDF(["Id", "Centroid"])
centroids.withColumn("Centroid", centroids.Centroid.cast("string"))\
         .coalesce(1)\
         .write\
         .csv(
              "/Project/Centroids",
              header="true",
              mode="overwrite",
              quote=""
          )

### Εμφάνιση αποτελεσμάτων

In [23]:
display(HTML(centroids.toPandas().to_html(index=False)))

Id,Centroid
1,"(-78.5038663350961, 40.61111272363974)"
2,"(-0.000130403195932698, 0.0005087396653170571)"
3,"(-73.95393670658325, 40.69903130538949)"
4,"(-73.9921679218484, 40.742681085334965)"
5,"(-73.96113225987052, 40.77182523315749)"


<hr>
<h1><center>Ερώτημα 3</center></h1>

Σε αυτό το ερώτημα θα πρέπει να βρούμε το PageRank κάθε κόμβου που βρίσκεται στα δεδομένα της `Google` με βάση τον επαναληπτικό τύπο:
$$ PR(p_i) = \frac{1 - d}{N} + d \sum_{p_j \in M(p_i)}{} \frac{PR(p_j)}{L(p_j)} $$
, όπου:
* $N$ ο αριθμός των κόμβων
* $d$ ο συντελεστής απόσβεσης
* $M(p_i)$ το σύνολο των σελίδων που δείχνουν στην $p_i$
* $L(p_j)$ το σύνολο των outbound links της σελίδας $p_j$

In [24]:
sqlContext.clearCache()

### Διάβασμα δεδομένων

In [25]:
data = sc.textFile("/Project/web-Google.txt")

### Συνάρτηση για δημιουργία `RDD`

In [26]:
def parse_nodes(line):
    """Function to map over RDD lines.
    
    Finds the from and to ids.
    
    Parameters
    ----------
    
    line: string
        CSV (space) delimited line from RDD.
        
    Returns
    -------
    
    nodes: tuple of ints
    """
    contents = line.split()
    return tuple(map(int, contents))

### Αρχικοποιήσεις
* Εφαρμόζουμε `map` στα δεδομένα για να δημιουργήσουμε ένα `RDD` που περιέχει σε `tuples` τα `from` και `to` ids των nodes.
* Δημιουργούμε ένα νέο `RDD` με βάση το προηγούμενο, έχοντας μαζέψει τα δεδομένα ανά `from` id, και ύστερα το `RDD` θα αποτελείται από `tuple` με `(from id, neighbors list)`.
* Δημιουργούμε άλλο ένα `RDD` με βάση το παραπάνω, το οποίο έχει για κάθε κλειδί (`from id`) το αρχικό σκορ.
* `iterations = 5`, επαναλαμβάνουμε τον αλγόριθμο για 5 επαναλήψεις.
* `N`, ο αριθμός των δεδομένων του `RDD`.
* `d = 0.85`, εξ' υποθέσεως.

In [27]:
parts = 10
web = data.map(parse_nodes)\
          .repartition(parts)

adjacent = web.groupByKey()\
              .mapValues(list)
score = adjacent.mapValues(lambda _: 0.5)
iterations = 5
N = 875713
d = 0.85

### Συνάρτηση για εφαρμογή αλγορίθμου εντός αθροίσματος

In [28]:
def contribution(nodes, score):
    """Generator function for flatMap.
    
    Finds for each outgoing node the:
    .. math::
        \frac{PR(p_j)}{L(p_j)}
        
    Parameters
    ----------
    
    nodes: iterable
        Contains the node ids.
        
    score: float
        The PR of the current node.
        
    Yields
    ------
    
    node_score: tuple of int, float
        The tuple containing the node id and its new score
        from this outgoing node.
    """
    L = len(nodes)
    for node in nodes:
        yield (node, score / L)

### Συνάρτηση για εφαρμογή του τύπου με υπολογισμένο άθροισμα

In [29]:
def PR(N, d, score):
    return ((1 - d) / N) + (d * score)

PPR = partial(PR, N, d)

### Εφαρμογή αλγορίθμου
* Ενώνουμε `adjacent` και `scores`, με βάση το `from id`.
* Εφαρμόζουμε `flatMap` για να έχουμε για κάθε `id` ένα επιμέρους σκορ.
* Εφαρμόζουμε `reduceByKey` πάνω στο παραπάνω `flatMapped RDD` ώστε να βρούμε το άθροισμα όλων των επιμέρους σκορ.
* Εφαρμόζουμε `mapValues` στο παραπάνω για να εφαρμόσουμε τον τελικό τύπο και έτσι βρίσκουμε τα ανανεωμένα σκορ για κάθε κόμβο.

In [30]:
for _ in range(iterations):
    score = adjacent.join(score, numPartitions=parts)\
                    .flatMap(
                         lambda x: contribution(x[1][0], x[1][1])
                     )\
                    .reduceByKey(add)\
                    .mapValues(PPR)

### Αποθήκευση αρχείου στο `HDFS`
* μετατρέπουμε το `RDD` σε `PySpark DataFrame` με `headers`
* μαζεύουμε σε ένα `partition`
* αποθηκεύουμε ως `csv`

In [31]:
score = score.toDF(["NodeId", "PageRank"])\
             .sort("NodeId")
score.coalesce(1)\
     .write\
     .csv("/Project/PageRank", header="true", mode="overwrite")

### Εμφάνιση αποτελεσμάτων

In [32]:
display(HTML(score.toPandas().head().to_html(index=False)))

NodeId,PageRank
0,6.098674
1,0.356691
2,0.570618
3,0.067347
4,0.110981
