# Exercices 

## Question 0

In [1]:
import os
from tempfile import NamedTemporaryFile, TemporaryDirectory
import json
import tarfile
from shutil import copyfile

In [2]:
def download_blob(blob: str) -> TemporaryDirectory:
    with NamedTemporaryFile() as f:
        temp_dir = TemporaryDirectory()
        target_dir = temp_dir.name
        with tarfile.open(blob, "r:gz") as tf:
            tf.extractall(path=target_dir)

        return temp_dir

def make_tarfile(output_filename, source_dir):
    with tarfile.open(output_filename, "w:gz") as tar:
        tar.add(source_dir, arcname=os.path.basename(source_dir))

In [3]:
path = 'blobs/'
blobs_files = os.listdir(path)
#Creating the folder if it does not exist already
target_directory = 'clean_blobs_1/'
if not os.path.exists(target_directory):
    os.makedirs(target_directory)

#iterating through all compressed files
for blob_name in blobs_files:
    blob_path = os.path.join(path, blob_name)
    temp_dir = download_blob(blob_path)
    meta_path = os.path.join(temp_dir.name, "metadata.json")
    result_path = os.path.join(temp_dir.name, "result.json")
    extracted_data = {}

    name = blob_name.split('-')[0]

    with open(meta_path, "r") as f:
        meta = json.load(f)

    with open(result_path, "r") as f:
        result = json.load(f)

    target_file = os.path.join(target_directory, blob_name)
    if meta['check_name'] != name:
        meta['check_name'] = name

        #We need to compress the blob and save it to the target directory
        with open('metadata.json', 'w') as outfile:
            json.dump(meta, outfile)

        with open('result.json', 'w') as outfile:
            json.dump(result, outfile)
            
        with tarfile.open(target_file,"w:gz") as tar:
            tar.add(os.path.basename('metadata.json'))
            tar.add(os.path.basename('result.json'))
    else:
        copyfile(blob_path, target_file)

    

## Question 1

A typical Beam driver program works as follows:

1. Create a Pipeline object and set the pipeline execution options, including the Pipeline Runner.
2. Create an initial PCollection for pipeline data, either using the IOs to read data from an external storage system, or using a Create transform to build a PCollection from in-memory data.
3. Apply PTransforms to each PCollection. Transforms can change, filter, group, analyze, or otherwise process the elements in a PCollection. A transform creates a new output PCollection without modifying the input collection. A typical pipeline applies subsequent transforms to each new output PCollection in turn until processing is complete. However, note that a pipeline does not have to be a single straight line of transforms applied one after another: think of PCollections as variables and PTransforms as functions applied to these variables: the shape of the pipeline can be an arbitrarily complex processing graph.
4. Use IOs to write the final, transformed PCollection(s) to an external source.
5. Run the pipeline using the designated Pipeline Runner.

In [4]:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io import fileio
import zipfile
import io

I think we can use a beam.map and use a custom function to apply to each element of the beam collection.

https://beam.apache.org/documentation/patterns/file-processing/

https://beam.apache.org/documentation/transforms/python/elementwise/map/

https://stackoverflow.com/questions/51433664/read-files-from-multiple-folders-in-apache-beam-and-map-outputs-to-filenames

https://beam.apache.org/get-started/wordcount-example/


In [5]:
def process_blobs(blob, output_dir):
    print(blob_path)
    blob_path = blob.metadata.path
    blob_name = blob_path.split('/')[1]
    check_name = blob_name.split('-')[0]
    
    temp_dir = download_blob(blob_path)
    meta_path = os.path.join(temp_dir.name, "metadata.json")
    result_path = os.path.join(temp_dir.name, "result.json")
    extracted_data = {}
    
    with open(meta_path, "r") as f:
        meta = json.load(f)

    with open(result_path, "r") as f:
        result = json.load(f)

    target_file = os.path.join(output_dir, blob_name)
    if meta['check_name'] != check_name:
        meta['check_name'] = check_name

        #We need to compress the blob and save it to the target directory
        with open('metadata.json', 'w') as outfile:
            json.dump(meta, outfile)

        with open('result.json', 'w') as outfile:
            json.dump(result, outfile)
            
        with tarfile.open(target_file,"w:gz") as tar:
            tar.add(os.path.basename('metadata.json'))
            tar.add(os.path.basename('result.json'))
    else:
        copyfile(blob_path, target_file)
    

In [6]:
with beam.Pipeline() as pipeline:
    readable_files = (
          pipeline
          | fileio.MatchFiles('blobs/*.tar.gz')
          | fileio.ReadMatches(compression='auto')
          | beam.Map(lambda x: (process_blobs(x, 'clean_blobs_2')))
    )





## Question 2

En condiciones reales, los archivos residen en un bucket de [google cloud storage](https://cloud.google.com/products/storage/) y hay petabytes de ellos: ¿cómo habrá que modificar la solución a la pregunta 2 para que el source y el sink sean buckets de google cloud en vez de archivos en local?

¿Cómo hay que diseñar el código para poder cambiar de source y sink fácilmente?

In [45]:
from google.cloud import storage
from google.oauth2 import service_account



class GcloudClient():
    
    def __init__(self):
        self.storage_client = storage.Client("staging-infra-240711")
        # Create a bucket object for our bucket
        self.bucket = self.storage_client.get_bucket("mapreduce-exercise-manuel")
    
    def download_blob_from_bucket(self, blob_name):

        # Create a blob object from the filepath
        blob = self.bucket.blob(blob_name)
        # Download the file to a destination

        with NamedTemporaryFile() as f:
            blob.download_to_file(f)
            f.flush()

            temp_dir = TemporaryDirectory()
            target_dir = temp_dir.name
            with tarfile.open(f.name, "r:gz") as tf:
                tf.extractall(path=target_dir)

            return temp_dir
    def upload_blob_to_bucket(self, target_location, source_file_name):
        blob = self.bucket.blob(target_location)
        blob.upload_from_filename(source_file_name)
        

def process_blobs_2(blob, output_dir):

    blob_path = blob.metadata.path
    blob_name = 'blobs/' + blob_path.split('/blobs/')[1]
    check_name = blob_name.split('-')[0].split('/')[1]
    
    gcloud_client = GcloudClient()
    
    temp_dir = gcloud_client.download_blob_from_bucket(blob_name)
    meta_path = os.path.join(temp_dir.name, "metadata.json")
    result_path = os.path.join(temp_dir.name, "result.json")
    extracted_data = {}
        
    with open(meta_path, "r") as f:
        meta = json.load(f)

    with open(result_path, "r") as f:
        result = json.load(f)

    
    source_file_name = os.path.join(output_dir, blob_name)
    target_location = os.path.join("clean_blobs_3/", source_file_name)
    
    if meta['check_name'] != check_name:
        meta['check_name'] = check_name

        #We need to compress the blob and save it to the target directory
        with open('metadata.json', 'w') as outfile:
            json.dump(meta, outfile)

        with open('result.json', 'w') as outfile:
            json.dump(result, outfile)
            
        with tarfile.open(source_file_name,"w:gz") as tar:
            tar.add(os.path.basename('metadata.json'))
            tar.add(os.path.basename('result.json'))
                
        gcloud_client.upload_blob_to_bucket(target_location, source_file_name)
    else:
        gcloud_client.upload_blob_to_bucket(target_location, blob_name)

In [46]:
with beam.Pipeline() as pipeline:
    readable_files = (
          pipeline
          | fileio.MatchFiles('gs://mapreduce-exercise-manuel/blobs/*.tar.gz')
          | fileio.ReadMatches(compression='auto')
          | beam.Map(lambda x: (process_blobs_2(x, 'clean_blobs_3')))
    )



clean_blobs_3/clean_blobs_3/blobs/check_200-2021-09-05T13:43:09.003135.tar.gz
clean_blobs_3/clean_blobs_3/blobs/check_200-2021-09-18T05:28:23.003135.tar.gz
clean_blobs_3/clean_blobs_3/blobs/check_200-2021-09-29T02:30:43.003135.tar.gz
clean_blobs_3/clean_blobs_3/blobs/check_404-2021-08-21T17:26:17.003135.tar.gz
clean_blobs_3/clean_blobs_3/blobs/check_404-2021-08-31T12:49:19.003135.tar.gz
clean_blobs_3/clean_blobs_3/blobs/check_404-2021-08-31T14:02:25.003135.tar.gz
clean_blobs_3/clean_blobs_3/blobs/check_https-2021-08-21T05:58:24.003135.tar.gz
clean_blobs_3/clean_blobs_3/blobs/check_login-2021-08-18T15:36:53.003135.tar.gz
clean_blobs_3/clean_blobs_3/blobs/check_login-2021-08-24T03:16:43.003135.tar.gz
clean_blobs_3/clean_blobs_3/blobs/check_login-2021-09-07T15:40:00.003135.tar.gz
