In [None]:
En la presente tarea, se realiza una conexión con la base de datos postgres y un join con un archivo csv.

Nota: Para realizar la conexión con la base de datos se agrego el driver de conexión de postgre en la siguiente ruta:
C:\Spark\jars

In [1]:
# librerias de pyspark
import findspark
findspark.init('C:\Spark')

from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, date_format, udf 
from pyspark.sql.types import DateType



In [2]:
# configuracion del driver de conexión de postgres
spark = SparkSession \
    .builder \
    .appName("Basic JDBC pipeline") \
    .config("spark.driver.extraClassPath", "C:/Users/JuanPablo/BigDataTEC-master/Tarea3/postgresql-42.2.9.jar") \
    .config("spark.executor.extraClassPath", "C:/Users/JuanPablo/BigDataTEC-master/Tarea3/postgresql-42.2.9.jar") \
    .getOrCreate()



In [3]:
# parametros de conexión y lectura de tabla de la base de datos

df = spark \
    .read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://localhost/postgres") \
    .option("user", "postgres") \
    .option("password", "admin") \
    .option("dbtable", "transactions") \
    .load()

df.show()

+---+-----------+------+-------------------+
| id|customer_id|amount|       purchased_at|
+---+-----------+------+-------------------+
|  1|          1|    55|2017-03-01 09:00:00|
|  2|          1|   125|2017-03-01 10:00:00|
|  3|          1|    32|2017-03-02 13:00:00|
|  4|          1|    64|2017-03-02 15:00:00|
|  5|          1|   128|2017-03-03 10:00:00|
|  6|          2|   333|2017-03-01 09:00:00|
|  7|          2|   334|2017-03-01 09:01:00|
|  8|          2|   333|2017-03-01 09:02:00|
|  9|          2|    11|2017-03-03 20:00:00|
| 10|          2|    44|2017-03-03 20:15:00|
+---+-----------+------+-------------------+



In [4]:
# se agrega una columna nueva de la fecha con su respectivo formato
formatted_df = df.withColumn("date_string", date_format(col("purchased_at"), 'MM/dd/yyyy'))
formatted_df.show()

+---+-----------+------+-------------------+-----------+
| id|customer_id|amount|       purchased_at|date_string|
+---+-----------+------+-------------------+-----------+
|  1|          1|    55|2017-03-01 09:00:00| 03/01/2017|
|  2|          1|   125|2017-03-01 10:00:00| 03/01/2017|
|  3|          1|    32|2017-03-02 13:00:00| 03/02/2017|
|  4|          1|    64|2017-03-02 15:00:00| 03/02/2017|
|  5|          1|   128|2017-03-03 10:00:00| 03/03/2017|
|  6|          2|   333|2017-03-01 09:00:00| 03/01/2017|
|  7|          2|   334|2017-03-01 09:01:00| 03/01/2017|
|  8|          2|   333|2017-03-01 09:02:00| 03/01/2017|
|  9|          2|    11|2017-03-03 20:00:00| 03/03/2017|
| 10|          2|    44|2017-03-03 20:15:00| 03/03/2017|
+---+-----------+------+-------------------+-----------+



In [5]:
# se agrega una columna nueva de fecha
string_to_date = \
    udf(lambda text_date: datetime.strptime(text_date, '%m/%d/%Y'),
        DateType())

typed_df = formatted_df.withColumn("date", string_to_date(formatted_df.date_string))
typed_df.show()
typed_df.printSchema()

+---+-----------+------+-------------------+-----------+----------+
| id|customer_id|amount|       purchased_at|date_string|      date|
+---+-----------+------+-------------------+-----------+----------+
|  1|          1|    55|2017-03-01 09:00:00| 03/01/2017|2017-03-01|
|  2|          1|   125|2017-03-01 10:00:00| 03/01/2017|2017-03-01|
|  3|          1|    32|2017-03-02 13:00:00| 03/02/2017|2017-03-02|
|  4|          1|    64|2017-03-02 15:00:00| 03/02/2017|2017-03-02|
|  5|          1|   128|2017-03-03 10:00:00| 03/03/2017|2017-03-03|
|  6|          2|   333|2017-03-01 09:00:00| 03/01/2017|2017-03-01|
|  7|          2|   334|2017-03-01 09:01:00| 03/01/2017|2017-03-01|
|  8|          2|   333|2017-03-01 09:02:00| 03/01/2017|2017-03-01|
|  9|          2|    11|2017-03-03 20:00:00| 03/03/2017|2017-03-03|
| 10|          2|    44|2017-03-03 20:15:00| 03/03/2017|2017-03-03|
+---+-----------+------+-------------------+-----------+----------+

root
 |-- id: integer (nullable = true)
 |-- cu

In [6]:
# Se suman los datos por medio de la funcion groupby
sum_df = typed_df.groupBy("customer_id", "date").sum()
sum_df.show()

+-----------+----------+-------+----------------+-----------+
|customer_id|      date|sum(id)|sum(customer_id)|sum(amount)|
+-----------+----------+-------+----------------+-----------+
|          2|2017-03-01|     21|               6|       1000|
|          1|2017-03-02|      7|               2|         96|
|          1|2017-03-03|      5|               1|        128|
|          1|2017-03-01|      3|               2|        180|
|          2|2017-03-03|     19|               4|         55|
+-----------+----------+-------+----------------+-----------+



In [7]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.jars", "C:\Spark\postgresql-42.2.9.jar") \
    .getOrCreate()

df = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://localhost/postgres") \
    .option("dbtable", "transactions") \
    .option("user", "postgres") \
    .option("password", "admin") \
    .option("driver", "org.postgresql.Driver") \
    .load()

df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- amount: integer (nullable = true)
 |-- purchased_at: timestamp (nullable = true)



In [10]:
stats_df = \
    sum_df.select(
        col('customer_id'),
        col('date'),
        col('sum(amount)').alias('amount'))

stats_df.printSchema()
stats_df.show()

root
 |-- customer_id: integer (nullable = true)
 |-- date: date (nullable = true)
 |-- amount: long (nullable = true)

+-----------+----------+------+
|customer_id|      date|amount|
+-----------+----------+------+
|          2|2017-03-01|  1000|
|          1|2017-03-02|    96|
|          1|2017-03-03|   128|
|          1|2017-03-01|   180|
|          2|2017-03-03|    55|
+-----------+----------+------+



In [11]:
# cargamos una fuente de datos en un archivo csv

from pyspark.sql.types import IntegerType, StringType, StructField, StructType

names_df = spark \
    .read \
    .format("csv") \
    .option("path", "names.csv") \
    .option("header", True) \
    .schema(StructType([
                StructField("id", IntegerType()),
                StructField("name", StringType()),
                StructField("currency", StringType())])) \
    .load()

names_df.printSchema()
names_df.show()

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- currency: string (nullable = true)

+---+----+--------+
| id|name|currency|
+---+----+--------+
|  1|John|     CRC|
|  2|Jane|     EUR|
+---+----+--------+



In [12]:
# Unimos las tablas de las diferentes fuentes de datos, tanto de la base de datos con del csv

joint_df = stats_df.join(names_df, stats_df.customer_id == names_df.id)
joint_df.printSchema()
joint_df.show()

root
 |-- customer_id: integer (nullable = true)
 |-- date: date (nullable = true)
 |-- amount: long (nullable = true)
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- currency: string (nullable = true)

+-----------+----------+------+---+----+--------+
|customer_id|      date|amount| id|name|currency|
+-----------+----------+------+---+----+--------+
|          2|2017-03-01|  1000|  2|Jane|     EUR|
|          1|2017-03-02|    96|  1|John|     CRC|
|          1|2017-03-03|   128|  1|John|     CRC|
|          1|2017-03-01|   180|  1|John|     CRC|
|          2|2017-03-03|    55|  2|Jane|     EUR|
+-----------+----------+------+---+----+--------+

