## Problem:  Given a dataset of bike trips containing the location with geo-spatial coordinates, compute the total distance commuted by the users collectively.

#### The dataset is taken from https://github.com/danielbeach/data-engineering-practice/tree/main/Exercises/Exercise-6/data

#### It is possible to compute the distance between two geo-spatial coordinates (lat-long pair). 
Refer https://www.movable-type.co.uk/scripts/latlong.html for the formula.

### Import necessary packages

In [None]:
import math
import pandas as pd
from tqdm.auto import tqdm

In [None]:
# Add progress bar to pandas apply() functions
tqdm.pandas()

In [None]:
# compute the 'haversine' distance in meters between two geo positions
# Refer https://www.movable-type.co.uk/scripts/latlong.html for the formula.
def distance(lat1, lon1, lat2, lon2):
    R = 6371e3; # radius of Earth in metres
    φ1 = lat1 * math.pi/180; # φ, λ in radians
    φ2 = lat2 * math.pi/180;
    Δφ = (lat2-lat1) * math.pi/180;
    Δλ = (lon2-lon1) * math.pi/180;

    a = math.sin(Δφ/2) * math.sin(Δφ/2) + math.cos(φ1) * math.cos(φ2) * math.sin(Δλ/2) * math.sin(Δλ/2);
    c = 2 * math.atan2(math.sqrt(a), math.sqrt(1-a));

    d = R * c; # distance in metres
    return math.nan if math.isnan(d) else int(d)

#### load the data file into a pandas dataframe

In [None]:
DATAFILE = 'Divvy_Trips_2020_Q1.xlsx'
DATAFILE_PQ = 'Divvy_Trips_2020_Q1.parquet'

In [None]:
#df = pd.read_excel(DATAFILE)
#df = df.dropna()
#df = df.astype(str)
#df.to_parquet(DATAFILE_PQ)

# load the data file into a pandas dataframe
df = pd.read_parquet(DATAFILE_PQ)
# get rid of the empty rows.
df = df.dropna()
# view the top 5 records.
df.head()

### collect the geo location pairs per record and call the distance function on each record

In [None]:
df['distance'] = df[['start_lat', 'start_lng', 'end_lat', 'end_lng']].progress_apply(
    lambda x: distance(float(x[0]), float(x[1]), float(x[2]), float(x[3])), axis=1)

### now compute the total by invoking the sum method of the dataframe

In [None]:
total = df.distance.sum()
print('Total trip distance is', int(total/1000), 'kilometers over', df.shape[0], 'trips')

### let's look at some of the long distance trips

In [None]:
# get the records with more than 20km trips
df[df.distance>20000]

## Let's solve it using PySpark now, hopefully using parallel processing.

### import the necessary packages

In [None]:
import pyspark

### create the spark context, which will create the spark backbone

In [None]:
sc = pyspark.SparkContext()

#### We can monitor the operation via http://localhost:4040
### let's create Spark Dataframe from the pandas dataframe

In [None]:
from pyspark.sql import SparkSession
 
# Building the SparkSession and name
# it :'pandas to spark'
spark = SparkSession.builder.appName("pandas to spark").getOrCreate()
 
# create DataFrame
df_spark = spark.createDataFrame(df)
 
df_spark.show()

### let's compute the distance from the dataframe

In [None]:
dist = df_spark.rdd.map(lambda x: distance(float(x[8]), float(x[9]), float(x[10]), float(x[11])))

### let's now compute the total distance by reducing the RDD

In [None]:
total_distance = dist.reduce(lambda x,y: x+y)

### report the findings

In [None]:
count = df_spark.count()
print('Total trip distance is', int(total_distance/1000), 'kilometers over', count, 'trips')

### Let's try another way of doing this in Spark


#### let's get the lat lon values from the dataframe

In [None]:
latlon_records = df[['start_lat', 'start_lng', 'end_lat', 'end_lng']].values

In [None]:
latlon_records

### let's convert the data in to a RDD.  Here the number of slices is an important parameter that controls the number of jobs that are runnable.

In [None]:
latlon_rdd = sc.parallelize(latlon_records, numSlices=100)

### let's now run the same job of computing the individual distances followed by the total distance

In [None]:
total_distance = latlon_rdd \
.map(lambda x: distance(float(x[0]), float(x[1]), float(x[2]), float(x[3]))) \
.reduce(lambda x,y: x+y)

### report the findings

In [None]:
count = latlon_rdd.count()
print('Total trip distance is', int(total_distance/1000), 'kilometers over', count, 'trips')

### Let's look at another example of parallel processing files using Spark

## Problem: Given a folder of images, OCR them and compute the token distribution

* Convert the image to text
* combine the texts into a large blob
* tokenize the text into token seperated by whitespaces
* compute the number of unique tokens with their respect counts
* save the output in a file

In [None]:
from pathlib import Path
FOLDER = 'funsd'

In [None]:
list_of_files = list(map(lambda x: FOLDER + '/' + x.name, Path(FOLDER).glob('*.*')))
list_of_files[:5]

### create a function to invoke the tesseract command

In [None]:
import subprocess as sp
import os
my_env = os.environ.copy()
my_env["OMP_THREAD_LIMIT"] = '1'

def ocr_task(path):
    # invoke the tesseract command to run OCR on the input image
    # set the output to go to stdout so that we can collect it in memory.
    result = sp.run(['tesseract', path, '-'], 
                     stdout=sp.PIPE, stderr=sp.PIPE, 
                     check=True, text=True,
                     env=my_env)
    # check if the command executed without errors
    if result.returncode == 0:
        # return the OCR text
        return result.stdout
    # return blank to filter later.
    return ""

### check if the function is working fine.

In [None]:
ocr_task('funsd/0060308251.png')

### let's gauge the time taken to run the OCR task in sequential order.

In [None]:
text_fragments = map(ocr_task, list_of_files[::-1])

In [None]:
all_text = "\n".join(text_fragments)
all_text[:100]

### let's try to parallelization

In [None]:
lof_rdd = sc.parallelize(list_of_files[::-1], numSlices=8)

### we will configure the ocr_task as the mapper function.

In [None]:
texts = lof_rdd.map(ocr_task)

### we will now tokenize each of the texts into an array of tokens
#### we use flatMap here which is an equivalent of map() followed by flatten()

In [None]:
import re
# flatmap gets one dimensional array, while map gets an array of array.
# as we are interested in counting the unique tokens, we need a flattened array.
tokens = texts.flatMap(lambda x: re.findall(r'[A-Za-z\']+', x))

### let's convert every token to a tuple (token,1), which we can reduce by key later to get the distribution

In [None]:
token_tuples = tokens.map(lambda x: (x,1))

### let's count by key to get the distribution now

In [None]:
token_counts = token_tuples.countByKey()

### Let's create a dataframe with the estimated token distribution results

In [None]:
newdf = pd.DataFrame({"tokens":list(token_counts.keys()), "freq":list(token_counts.values())})


In [None]:
newdf.sort_values('freq', ascending=False)

### now, save it as a spreadsheet

In [None]:
newdf.to_excel('/tmp/output.xlsx')

## Let's process Amazon reviews

### How to do complex transformation using map functions?

In [None]:
# helper to read a text file
def read_file(path):
    with open(path, 'r') as file:
        textdata = file.read()
        file.close()
        return textdata
    
# create a helper function to view a sample of a text file
def view_file(path, length=50, lines=False):
    textdata = read_file(path)

    # if we need lines, split it and display the required number of lines.
    sample = "\n".join(textdata.split("\n")[:length]) if lines else textdata[:length]
        
    print("TextSize:", len(textdata), "\n\nSample:", sample)

In [None]:
DATAFILE = 'Gourmet_Foods.txt'
view_file(DATAFILE, 20, lines=True)

### data file is a single archive of reviews.  We need to extract the review/text to construct a dataset for further processing
* scan the file for "review/text:" pattern and extract the right side of the pattern.
* also get the product id, so that we can map the review text to the product id.
* let's also pick up the review/score to record the star rating.
* now we should have a triplet with (productid, rating, review_text)
* if we carefully see, the reviews are seperated by multiple consecutive newlines!!

### let's read the data and split the data based on consecutive newlines

In [None]:
import re
# read the data file and split by \n\n+
reviews = re.split(r'\n\n+', read_file(DATAFILE))
print("number of reviews:", len(reviews))
print(reviews[1])

### create the parallelizable dataset

In [None]:
reviews_rdd = sc.parallelize(reviews, numSlices=8)

### we can define a function to process each block to extract the triplet

In [None]:
def process(text):
    match = re.search('product/productId: (.+)', text)
    product_id = match.group(1) if match else ""
    match = re.search('review/score: (.+)', text)
    star_rating = float(match.group(1)) if match else 0.0
    match = re.search('review/text: (.+)', text)
    review_text = match.group(1) if match else ""
    return (product_id, star_rating, review_text)

### let's extend the beam to include the extraction of triplets

In [None]:
triplets = reviews_rdd.map(process).filter(lambda x: x[0] != "" and x[1]>0.0 and x[2] != "")

### As a task, let's group the data by product id to find the average rating.

In [None]:
# let's get rid of the texts first and them group the data by key (product id)
product_rating = triplets.map(lambda x: (x[0], x[1])).groupByKey().map(lambda p_r: (p_r[0], round(sum(p_r[1])/len(p_r[1]),2)))

In [None]:
# run the pipeline now.
result = product_rating.collect()

In [None]:
# sort the result by ratings
result_sorted = sorted(result, key=lambda tup: tup[1], reverse=True)
result_sorted[:10]