In [None]:
"""
- Moving & Deserialization

Architechture of AWS-EMR = Amazon Web Service - Elastic Map Reduce
    => S3 : long term persistent memory
    => head node : Standard Unix file system
    => HDFS : Hadoop Distributerd File System on Workers

"""
import os
import sys
import time

# on imagine qu'on a le fichier ALL.csv dans le head node (téléchargé sur un cloud ou un serveur ...)
%cd /mnt/workspace/edX-Micro-Master-in-Data-Science/big-data-analytics-using-spark/notebooks/Data/
!ls
!aws s3 ls s3://dse-weather/ALL.csv.gz
!aws s3 cp s3://dse-weather/ALL.csv.gz ./ALL.csv.gz
!rm ALL.csv
!gunzip ALL.csv.gz
!ls -l ALL.csv
!head -2 ALL.csv
#[...]
# distribution HDFS : creation du rep sur les workers
!hadoop fs -mkdir /weather
# distribution du fichier sur les workers:
!hadoop fs -copyFromLocal All.csv hdfs:///weather/weather.csv
# vérification de la présence du fichier sur les clusters
!hadoop fs -ls /weather


In [None]:
# verification de la présence de numpy_pack.py
%cd /mnt/workspace/edX-Micro-Master-in-Data-Science/big-data-analytics-using-spark/notebooks/Section2-PCA/PCA/data_preparation/ 
!ls lib

%pwd
!ls -l lib/numpy_pack.py

In [None]:
%%time
###Read CSV into a RDD :
from pyspark import SparkContext
# mise en place du cluster (ajout d'un fichier python pour utilisation de methodes / fonctions artisanales)
sc = SparkContext(pyFiles=['/mnt/workspace/edX-Micro-Master-in-Data-Science/big-data-analytics-using-spark/notebooks/Section2-PCA/PCA/data_preparation/lib/numpy_pack.py'])
#lecture CSV
RDD=sc.textFile('/weather/weather.csv')
RDD.count()

In [None]:
"""
With this size of file : it will be fster to compute it on a single machine than on several machine. 
The Advantage of parallelization with be proved on much bigger files (this one is 7GB big)
"""

In [None]:
"""
Deserialiszartion
"""
import numpy as np
"""Code for packing and unpacking a numpy array into a byte array.
   the array is flattened if it is not 1D.
   This is intended to be used as the interface for storing 
   
   This code is intended to be used to store numpy array as fields in a dataframe and then store the 
   dataframes in a parquet file.
"""
def packArray(a):
    """
    pack a numpy array into a bytearray that can be stored as a single 
    field in a spark DataFrame

    :param a: a numpy ndarray 
    :returns: a bytearray
    :rtype:

    """
    if type(a)!=np.ndarray:
        raise Exception("input to packArray should be numpy.ndarray. It is instead "+str(type(a)))
    return bytearray(a.tobytes())

def unpackArray(x,data_type=np.float16):
    """
    unpack a bytearray into a numpy.ndarray

    :param x: a bytearray
    :param data_type: The dtype of the array. This is important because if determines how many bytes go into each entry in the array.
    :returns: a numpy array
    :rtype: a numpy ndarray of dtype data_type.

    """
    return np.frombuffer(x,dtype=data_type)

In [None]:
def parse_weather(line):
    L=line.split(',')
    try:
        assert len(L)==368
        i=2
        L[i]=int(L[i])
        for i in range(3,368):
            if L[i]!='':
                L[i]=np.float16(L[i])
            else:
                L[i]=np.nan
    except:
        #if error in parsing, return (1, input line)
        return (1,line)
    Out=L[:3]
    Out.append(packArray(np.array(L[3:],dtype=np.float16)))
    # if parsing OK, return (0, parsed data)
    return (0,Out)

In [None]:
%%time
### Parsing the CSV file specificly for weather : All.csv:
Parsed=RDD.map(parse_weather).cache() # filter out bad rows which are mapped (1,line)
DATA=Parsed.filter(lambda x:x[0]==0).map(lambda x:x[1])
ERRORS=Parsed.filter(lambda x:x[0]==1).map(lambda x:x[1])

In [None]:
# execution plan of DATA :
print(DATA.todebugString.collect())

In [None]:
# RDD of precipitation (rain)
PRCP = RDD.filter(lambda row : row[1]== 'PRCP')
print('PRCT record : ',PRCP.count())
print('ERROR records : ', ERROR.count())

In [None]:
#showing the face of a record
DATA.take(1)

In [None]:
"""
- Create parquet file to HDFS and storing it to S3 , then closing the cluster
"""
from pyspark.sql import SQLContext
from pyspark.sql.types import Row, StructField, StructType, StringType, IntegerType, BinaryType, FloatType

# Just like using Spark requires having a SparkContext, using SQL requires an SQLContext
sqlContext = SQLContext(sc)
sqlContext
#creation of schema
schema = StructType([StructField("Station",     StringType(), True),
                     StructField("Measurement", StringType(), True),
                     StructField("Year",        IntegerType(),True),
                     StructField("Values",      BinaryType(),True)
                    ])
schema

# Create a DataFrame by applying the schema to the RDD and print the schema
ALL_DataFrame = sqlContext.createDataFrame(DATA, schema)
ALL_DataFrame.printSchema()

#remove previous parquet files 
%%time
!hadoop fs -rm -r /weather/weather.parquet

# creating a HDFS of parquet files
outfilename="hdfs:///weather/weather.parquet"
ALL_DataFrame.write.save(outfilename)

In [None]:
"""
-copy hdfs parquet to AWS S3 : long term storage
"""
# removing headnode parquet file weather.parquet
%cd /mnt/workspace/Data/
!rm -rf weather.parquet/
!ls -lrt

# copy parquet from HDFS to Headnode
!hadoop fs -copyToLocal /weather/weather.parquet weather.parquet
#rm parquet directory from s3
!aws s3 rm --recursive --quiet s3://dse-weather/weather.parquet
# Copy parquet directory from headnode to s3
!aws s3 cp --recursive --quiet ./weather.parquet s3://dse-weather/weather.parquet
