In [2]:
import findspark 
findspark.init()
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext

spark = SparkSession.builder.appName('i_s3_postgres').getOrCreate()
spark

In [3]:
from pyspark import SparkContext
sc = SparkContext.getOrCreate()
sc

In [4]:
db_name = 'innovation'
user = 'innovation'
password = 'innovation'
table_name = 'quipu_traficmodelk'

url = "jdbc:postgresql://localhost:5432/{}".format(db_name)
credentials = {
    'user' : user, 'password' : password
}
connection_db = {
    'url': url, 'table': table_name, 'properties': credentials
}
df_postgres = spark.read.jdbc(**connection_db)
df_postgres = df_postgres.select([c for c in df_postgres.columns if c not in ['id']])
df_postgres.show()

+----------+----+-----+---+-------------+---------------+------------------+---------------+----------+----------------+------------------+
|article_id|path|order|dia|page_avg_time|page_avg_scroll|page_scroll_starts|page_total_time|page_views|page_views_loyal|page_views_quality|
+----------+----+-----+---+-------------+---------------+------------------+---------------+----------+----------------+------------------+
+----------+----+-----+---+-------------+---------------+------------------+---------------+----------+----------------+------------------+



In [6]:
df_postgres.printSchema()

root
 |-- article_id: integer (nullable = true)
 |-- path: string (nullable = true)
 |-- order: integer (nullable = true)
 |-- dia: array (nullable = true)
 |    |-- element: timestamp (containsNull = true)
 |-- page_avg_time: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- page_avg_scroll: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- page_scroll_starts: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- page_total_time: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- page_views: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- page_views_loyal: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- page_views_quality: array (nullable = true)
 |    |-- element: integer (containsNull = true)



In [7]:
import os
import configparser

# os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages "org.apache.hadoop:hadoop-aws:2.7.6" pyspark-shell'

config = configparser.ConfigParser()
config.read(os.path.expanduser("~/.aws/credentials"))

aws_profile = 'default'
access_key = config.get(aws_profile, "aws_access_key_id") 
secret_key = config.get(aws_profile, "aws_secret_access_key")

In [8]:
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import DoubleType, IntegerType, StringType

bucket_name = "charbeat-trafic"
key_file = "gestion.pe/*.csv"
path_csv_spark = 's3n://{}:{}@{}/{}'.format(access_key, secret_key, bucket_name, key_file)

opts = {'header': True, 'sep': ',', 'inferSchema':True}
df_csv = spark.read.csv(
    path_csv_spark, **opts
)
df_csv.show()

+--------------------+---------------+-------------+------------------+---------------+------------+----------+----------------+------------------+----------+-------------------+
|                path|page_avg_scroll|page_avg_time|page_scroll_starts|page_total_time|page_uniques|page_views|page_views_loyal|page_views_quality|article_id|                dia|
+--------------------+---------------+-------------+------------------+---------------+------------+----------+----------------+------------------+----------+-------------------+
|archivo.gestion.p...|              0|           53|                 0|             53|           1|         1|               0|                 1|   2197561|2019-02-19 00:00:00|
|archivo.gestion.p...|            471|            6|                 0|             20|           3|         3|               0|                 1|     10000|2019-02-19 00:00:00|
|archivo.gestion.p...|           1224|           49|                 0|            691|          14|     

In [9]:
df_csv.count()

193730

In [11]:
df_csv = df_csv.sort('dia')
df_csv = df_csv.na.fill(0)

In [56]:
def get_article_unique(df_spark, col='article_id'):
    return set(
        df_spark.groupby(col)._df.toPandas()[col]
    )

In [57]:
%%time
articles_in_csv = get_article_unique(df_csv)
print('df_csv: ', len(articles_in_csv))

articles_in_db = get_article_unique(df_postgres)
print('df_postgres_local: ', len(articles_in_db))

df_csv:  53117
df_postgres_local:  0
CPU times: user 22 s, sys: 0 ns, total: 22 s
Wall time: 21.9 s


In [72]:
%%time
import collections
from pyspark.sql.functions import  monotonically_increasing_id
from pyspark.sql import functions as F

def add_article_news(set_id_csv, set_id_db, df_csv, df_db):
    row_up = None
    new_articles = list(set_id_csv - set_id_db)
    
    return df_csv.filter(
        F.col('article_id').isin(new_articles)
    ).groupby(
        ['article_id', 'path']
    ).agg(
        F.count(F.lit(1)).alias("order"),
        F.collect_list(df_csv.dia).alias("dia"),
        F.collect_list(df_csv.page_avg_scroll).alias("page_avg_scroll"),
        F.collect_list(df_csv.page_avg_time).alias("page_avg_time"),
        F.collect_list(df_csv.page_scroll_starts).alias("page_scroll_starts"),
        F.collect_list(df_csv.page_total_time).alias("page_total_time"),
        F.collect_list(df_csv.page_uniques).alias("page_uniques"),
        F.collect_list(df_csv.page_views).alias("page_views"),
        F.collect_list(df_csv.page_views_loyal).alias("page_views_loyal"),
        F.collect_list(df_csv.page_views_quality).alias("page_views_quality")
    )

row_up = add_article_news(articles_in_csv, articles_in_db, df_csv, df_postgres)

CPU times: user 12.6 s, sys: 3.26 s, total: 15.8 s
Wall time: 1min 11s


In [68]:
row_up.printSchema()

root
 |-- article_id: integer (nullable = true)
 |-- path: string (nullable = true)
 |-- order: long (nullable = false)
 |-- collect_list(dia): array (nullable = true)
 |    |-- element: timestamp (containsNull = true)
 |-- collect_list(page_avg_scroll): array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- collect_list(page_avg_time): array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- collect_list(page_scroll_starts): array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- collect_list(page_total_time): array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- collect_list(page_uniques): array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- collect_list(page_views): array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- collect_list(page_views_loyal): array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- collect_list(page_u

In [69]:
df_postgres.printSchema()

root
 |-- article_id: integer (nullable = true)
 |-- path: string (nullable = true)
 |-- order: integer (nullable = true)
 |-- dia: array (nullable = true)
 |    |-- element: timestamp (containsNull = true)
 |-- page_avg_time: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- page_avg_scroll: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- page_scroll_starts: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- page_total_time: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- page_views: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- page_views_loyal: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- page_views_quality: array (nullable = true)
 |    |-- element: integer (containsNull = true)



In [73]:
if len(articles_in_db) == 0:
    print("OVERWRITE")
    row_up = row_up.withColumn("id", monotonically_increasing_id())
    row_up.write.jdbc(mode='overwrite', **connection_db)
else:
    print("APPEND")
    row_up.write.jdbc(mode='append', **connection_db)

OVERWRITE
