In [2]:
import findspark
findspark.init()

In [3]:
import os
from io import BytesIO
from zipfile import ZipFile
from datetime import datetime
import re
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext

In [4]:
conf = SparkConf().setMaster("local[4]").setAppName("transport")
sc = SparkContext(conf=conf)

sqlContext = SQLContext(sc)

In [5]:
def build_filename(raw_name):
    day, from_day, to_day = re.findall(r'\d+', raw_name)
    return "day=%s/from=%s/to=%s" % (day, from_day, to_day)

In [6]:
def extract_files(compressed_name, stream):
    """
    Extract .csv files from a .zip file and load in different DataFrames.
    """
    with BytesIO(stream) as tf:
        tf.seek(0)        
        # Read the file as a zipfile and process the members
        try:            
            with ZipFile(tf, mode='r') as zipf:
                return [(build_filename(compressed_name + file_name), zipf.read(file_name)) for file_name in zipf.namelist()]
        except:
            return []

In [7]:
# datetime.strptime(row[0], "%d/%m/%Y %H:%M:%S")
def prepare_csv(file_name, table):
    return list(map(lambda row: [file_name, datetime.strptime(row[0], "%d/%m/%Y %H:%M:%S"), int(row[1]), 
                                 row[2], int(row[3]), row[4], 
                                 row[5]], table)) 

In [8]:
path = 'datasets/test-folder.small/*.zip'
rdd = sc.binaryFiles(path).flatMap(lambda a: extract_files(a[0], a[1]))
print(rdd.toDebugString().decode())

(2) PythonRDD[1] at RDD at PythonRDD.scala:53 []
 |  datasets/test-folder.small/*.zip BinaryFileRDD[0] at binaryFiles at NativeMethodAccessorImpl.java:0 []


In [9]:
rdd.getNumPartitions()

2

In [10]:
# Decode bytes and convert it in a list of strings
rdd = rdd.mapValues(lambda file: BytesIO(file).read().decode('cp1252').split('\n'))

In [11]:
# Convert string row to list row
rdd = rdd.mapValues(lambda a: list(map(lambda row: row.split(";")[:-1], a)))

In [12]:
# Drop header and last (and empty) row
rdd = rdd.mapValues(lambda table: table[1:-1])

In [13]:
# Change type of columns
rdd = rdd.flatMap(lambda a: prepare_csv(a[0], a[1]))

In [14]:
header = ['FILE_NAME',
            'FECHAHORATRX',
            'CODIGOENTIDAD',
            'NOMBREENTIDAD',
            'CODIGOSITIO',
            'NOMBRESITIO',
            'NROTARJETA']
header = list(map(lambda a: a.lower(), header))

In [15]:
%%time
df = rdd.toDF(header)

CPU times: user 232 ms, sys: 2.47 ms, total: 234 ms
Wall time: 2.72 s


In [16]:
df.printSchema()

root
 |-- file_name: string (nullable = true)
 |-- fechahoratrx: timestamp (nullable = true)
 |-- codigoentidad: long (nullable = true)
 |-- nombreentidad: string (nullable = true)
 |-- codigositio: long (nullable = true)
 |-- nombresitio: string (nullable = true)
 |-- nrotarjeta: string (nullable = true)



In [17]:
%%time
df.show()

+--------------------+-------------------+-------------+-------------+-----------+-----------+--------------------+
|           file_name|       fechahoratrx|codigoentidad|nombreentidad|codigositio|nombresitio|          nrotarjeta|
+--------------------+-------------------+-------------+-------------+-----------+-----------+--------------------+
|day=20190311/from...|2019-03-01 01:20:44|           16|    U3 - Vule|       2308|    BJFZ-75|37d3a2cd6ba280f3a...|
|day=20190311/from...|2019-03-01 01:20:47|           16|    U3 - Vule|       2308|    BJFZ-75|7464f0fb6fe00e347...|
|day=20190311/from...|2019-03-01 01:20:50|           16|    U3 - Vule|       2308|    BJFZ-75|3ce1dbdd2d904a953...|
|day=20190311/from...|2019-03-01 01:20:53|           16|    U3 - Vule|       2308|    BJFZ-75|dd9730ec384f81487...|
|day=20190311/from...|2019-03-01 01:21:00|           16|    U3 - Vule|       2308|    BJFZ-75|7f4ded632ab214988...|
|day=20190311/from...|2019-03-01 01:21:06|           16|    U3 - Vule|  

In [19]:
df.rdd.getNumPartitions()

2

In [20]:
%%time
df = df.repartition(4)

CPU times: user 1.2 ms, sys: 0 ns, total: 1.2 ms
Wall time: 2.34 ms


In [21]:
%%time
days = [file.file_name for file in df.select('file_name').distinct().collect()]
days

CPU times: user 18.7 ms, sys: 10.2 ms, total: 28.9 ms
Wall time: 2.06 s


In [22]:
%%time
df.groupBy('file_name').count().show()

+--------------------+-----+
|           file_name|count|
+--------------------+-----+
|day=20190311/from...|   99|
|day=20180818/from...|   99|
+--------------------+-----+

CPU times: user 18.5 ms, sys: 7.1 ms, total: 25.6 ms
Wall time: 1.28 s


In [25]:
%%time
for directory in days:
    if not os.path.exists(directory):
        os.makedirs(directory)
    df_day = df.select(df.columns[1:]).where(df.file_name == directory)
    df_day.write.parquet(directory + "/data.parquet", compression="gzip")

CPU times: user 6.01 ms, sys: 4.72 ms, total: 10.7 ms
Wall time: 1.66 s


In [23]:
%%time
for directory in days:
    print("%s=%d" % (directory, df.select(df.columns[1:]).where(df.file_name == directory).count()))

day=20190311/from=20190307/to=20190309=99
day=20180818/from=20180812/to=20180814=99
CPU times: user 9.37 ms, sys: 1.58 ms, total: 11 ms
Wall time: 1.28 s


In [24]:
df.printSchema()

root
 |-- file_name: string (nullable = true)
 |-- fechahoratrx: timestamp (nullable = true)
 |-- codigoentidad: long (nullable = true)
 |-- nombreentidad: string (nullable = true)
 |-- codigositio: long (nullable = true)
 |-- nombresitio: string (nullable = true)
 |-- nrotarjeta: string (nullable = true)

