In [27]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import (
    StructType, StructField, 
    StringType, IntegerType, DoubleType,
    DateType, FloatType,DataType, TimestampNTZType, TimestampType
    )

Criando conexão

In [28]:
# Create SparkSession
spark = SparkSession.builder \
           .appName('SparkByExamples.com') \
           .config("spark.jars", "C:\\Program Files (x86)\\PostgreSQL\\pgJDBC\\postgresql-42.7.2.jar")\
           .getOrCreate()

In [29]:
spark

Propriedades

In [1]:
url = "jdbc:postgresql://localhost:5432/ecommerce"
properties = {
    "user": "postgres",
    "password": "datascience007",
    "driver": "org.postgresql.Driver"
}

### Lendo em spark e passando para postgres

#### olistCustomers

In [31]:
olistCustomers = spark.read.format("csv")\
                    .options(header="true", delimiter=',', infer='false')\
                .load("dataset_limpo_para_analise\\olist_customers.csv")

In [32]:
olistCustomers.write\
    .jdbc(url=url, table="olist_Customers", mode="overwrite", properties=properties)

#### olistOrderItems
Tipando os atributos

In [33]:
schema = StructType([
    StructField("order_id"     ,       StringType(), False),
    StructField("order_item_id",       IntegerType(), False),
    StructField("product_id"   ,       StringType(), False),
    StructField("seller_id"    ,       StringType(), False),
    StructField("shipping_limit_date", StringType(), True),
    StructField("price",               FloatType(), False),
    StructField("freight_value",       FloatType(), False)
])

In [34]:
olistOrderItems = spark.read.format("csv")\
                    .options(header="true", delimiter=',', infer='false')\
                    .schema(schema)\
                .load("dataset_limpo_para_analise\\olist_order_items.csv")

In [35]:
olistOrderItems.write\
    .jdbc(url  =url, 
          table="olist_Order_Items", 
          mode ="overwrite", 
          properties=properties)

#### olistOrderPayments

In [36]:
schema = StructType([
    StructField("order_id"     ,       StringType(), False),
    StructField("payment_sequential",  IntegerType(), False),
    StructField("payment_type"    ,    StringType(), False),
    StructField("payment_installments", IntegerType(), False),
    StructField("payment_value",       FloatType(), False)
])

In [37]:
olistOrderPayments = spark.read.format("csv")\
                    .options(header="true", delimiter=',', infer='false')\
                    .schema(schema)\
                .load("dataset_limpo_para_analise\\olist_order_payments.csv")

In [38]:
olistOrderPayments.write\
    .jdbc(url  =url, 
          table="olist_Order_Payments", 
          mode ="overwrite", 
          properties=properties)

#### olistOrderReview

In [39]:
schema = StructType([
    StructField("review_id"    ,       StringType(), False),
    StructField("order_id"     ,       StringType(), False),
    StructField("review_score" ,       IntegerType(), False),
    StructField("review_comment_title" , StringType(), False),
    StructField("review_comment_message", StringType(), False),
    StructField("review_creation_date",   TimestampType(), False),
    StructField("review_answer_timestamp",TimestampType(), False)

])

In [40]:
olistOrderReviews = spark.read.format("csv")\
                    .options(header="true", delimiter=',', infer='false',multiline='true')\
                    .schema(schema)\
                .load("dataset_limpo_para_analise\\olist_order_review.csv")

In [41]:
olistOrderReviews.write\
    .jdbc(url  =url, 
          table="olist_Order_Reviews", 
          mode ="overwrite", 
          properties=properties)

#### olistSellers

In [42]:
olistSeller = spark.read.format("csv")\
                .options(header="true", delimiter=',', infer='false')\
            .load("dataset_limpo_para_analise\\olist_sellers.csv")

In [43]:
olistSeller.write\
    .jdbc(url  =url, 
          table="olist_Seller", 
          mode ="overwrite", 
          properties=properties)

#### olistProducts

In [44]:
olistProducts = spark.read.format("csv")\
                    .options(header="true", delimiter=',', infer='false')\
                .load("dataset_limpo_para_analise\\olist_products.csv")

In [45]:
olistProducts

DataFrame[_c0: string, product_id: string, product_category_name: string, product_name_lenght: string, product_description_lenght: string, product_photos_qty: string, product_weight_g: string, product_length_cm: string, product_height_cm: string, product_width_cm: string]

#### olistGeolocation

In [46]:
schema = StructType([
    StructField("geolocation_zip_code_prefix"    , StringType(), False),
    StructField("geolocation_lat"                , DoubleType(), False),
    StructField("geolocation_lng"                , DoubleType(), False),
    StructField("geolocation_city"               , StringType(), False),
    StructField("geolocation_state"             , StringType(), False),
])

In [47]:
olistGeolocation = spark.read.format("csv")\
                    .options(header="true", delimiter=',', infer='false')\
                    .schema(schema)\
                .load("dataset\\olist_geolocation_dataset.csv")

In [48]:
olistGeolocation.show(2)

+---------------------------+-------------------+------------------+----------------+-----------------+
|geolocation_zip_code_prefix|    geolocation_lat|   geolocation_lng|geolocation_city|geolocation_state|
+---------------------------+-------------------+------------------+----------------+-----------------+
|                      01037| -23.54562128115268|-46.63929204800168|       sao paulo|               SP|
|                      01046|-23.546081127035535|-46.64482029837157|       sao paulo|               SP|
+---------------------------+-------------------+------------------+----------------+-----------------+
only showing top 2 rows



In [49]:
olistGeolocation.write\
    .jdbc(url  =url, 
          table="olist_Geolocation", 
          mode ="overwrite", 
          properties=properties)

#### olistOrder 

In [60]:
schema = StructType([
    StructField("order_id"    ,       StringType(), False),
    StructField("customer_id" ,       StringType(), False),
    StructField("order_status" ,      StringType(), False),
    StructField("order_purchase_timestamp",    TimestampType(), False),
    StructField("order_approved_at" ,          TimestampType(), True),
    StructField("order_delivered_carrier_date",TimestampType(), True),
    StructField("order_delivered_customer_date",TimestampType(), True),
    StructField("order_estimated_delivery_date",TimestampType(), False),
    StructField("status_",StringType(), False)

])
schema

StructType([StructField('order_id', StringType(), False), StructField('customer_id', StringType(), False), StructField('order_status', StringType(), False), StructField('order_purchase_timestamp', TimestampType(), False), StructField('order_approved_at', TimestampType(), True), StructField('order_delivered_carrier_date', TimestampType(), True), StructField('order_delivered_customer_date', TimestampType(), True), StructField('order_estimated_delivery_date', TimestampType(), False), StructField('status_', StringType(), False)])

In [61]:
olistOrder = spark.read.format('csv')\
                    .options(header="true", delimiter=',', infer='false')\
                    .schema(schema)\
                .load("dataset_limpo_para_analise\\olist_order.csv")

In [62]:
olistOrder.write\
    .jdbc(url  =url, 
          table="olist_order", 
          mode ="overwrite", 
          properties=properties)

In [63]:
olistOrder.show(2)

+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+-------+
|            order_id|         customer_id|order_status|order_purchase_timestamp|  order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|status_|
+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+-------+
|e481f51cbdc54678b...|9ef432eb625129730...|    entregue|     2017-10-02 10:56:33|2017-10-02 11:07:15|         2017-10-04 19:55:00|          2017-10-10 21:25:13|          2017-10-18 00:00:00|     ok|
|53cdb2fc8bc7dce0b...|b0830fb4747a6c6d2...|    entregue|     2018-07-24 20:41:37|2018-07-26 03:24:27|         2018-07-26 14:31:00|          2018-08-07 15:27:45|          2018-08-13 00:00:00|     ok|
+----