In [None]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import udf
from pyspark.sql.functions import *
from pyspark.sql.window import Window
NoneType = type(None)
import os
import socket
import hashlib
import string

import time
from osgeo import ogr
import geopandas as gpd
from pyspark.sql import SparkSession
from sedona.register import SedonaRegistrator
from sedona.utils import SedonaKryoRegistrator, KryoSerializer

In [None]:
def createMd5(text):
    return hashlib.md5(text.encode('utf-8')).hexdigest()
md5Udf= udf(lambda z: createMd5(z),StringType())

def clean_lower(text):
    sentence = text.translate(str.maketrans('', '', '!"#$%&\'()*+,./:;<=>?@[\\]^`{|}~-_”“«»‘')).lower()
    return " ".join(sentence.split())
cleanLowerUdf= udf(lambda z: clean_lower(z),StringType())

def get_site_from_url(text):
    return text.split("/")[2]
getUrl= udf(lambda z: get_site_from_url(z),StringType())    


In [None]:
minio_ip = socket.gethostbyname('minio')
spark = SparkSession. \
    builder. \
    appName("Python Spark S3"). \
    config("spark.serializer", KryoSerializer.getName). \
    config("spark.executor.memory", "80g"). \
    config("spark.driver.memory", "80g"). \
    config('spark.dirver.maxResultSize', '5g'). \
    config("spark.kryo.registrator", SedonaKryoRegistrator.getName). \
    config('spark.hadoop.fs.s3a.endpoint', 'http://'+minio_ip+':9000'). \
    config("spark.hadoop.fs.s3a.access.key", "minio-access-key"). \
    config("spark.hadoop.fs.s3a.secret.key", "minio-secret-key"). \
    config('spark.hadoop.fs.s3a.impl', 'org.apache.hadoop.fs.s3a.S3AFileSystem'). \
    config('spark.jars.packages',
           'org.apache.sedona:sedona-python-adapter-3.0_2.12:1.0.0-incubating,org.datasyslab:geotools-wrapper:geotools-24.0'). \
    getOrCreate()
SedonaRegistrator.registerAll(spark)

In [None]:

st= StructType([
    StructField("abstract", StringType()),
    StructField("authors", StringType()),
    StructField("image", StringType()),
    StructField("metadata", StringType()),
    StructField("publish_date", TimestampType()),
    StructField("text", StringType()),
    StructField("title", StringType()),
    StructField("url", StringType()),
])

In [None]:
df_news_covid_mexico = spark \
                        .read.schema(st).option("timestampFormat", "dd-MM-yyyy") \
                        .json("s3a://news/covid_mexico/*.json")

In [None]:
df_news_covid_mexico.count()

In [None]:
df_news_covid_mexico.printSchema()

In [None]:
df_news_covid_mexico.show(10)

In [None]:
df_news_covid_mexico_date_text = df_news_covid_mexico.select(md5Udf("url").alias("article_id"),"title","url","publish_date",cleanLowerUdf("text").alias("clean_text"),getUrl("url").alias("site")).filter("length(text) >= 2")

In [None]:
df_news_covid_mexico_date_text.show(15)

In [None]:
df_news_covid_mexico_date_text.count()

In [None]:
df_news_covid_mexico_date_text.select("title").show(15,False)

In [None]:
url = "jdbc:postgresql://postgres/shared"
mode="overwrite"
properties = {
    "user": "shared",
    "password": os.environ['SHARED_PASSWORD']
}

In [None]:
df_news_covid_mexico_date_text.write.jdbc(url=url, table="tb_news_covid_mexico_date_text", mode=mode, properties=properties)

In [None]:
df_news_covid_mexico_palabras = df_news_covid_mexico_date_text.select("article_id","publish_date",explode(split(df_news_covid_mexico_date_text.clean_text, "\s")).alias("palabra")).where(length('palabra') > 1)

In [None]:
df_news_covid_mexico_palabras.show(30)

In [None]:
#https://sigdelta.com/blog/word-count-in-spark-with-a-pinch-of-tf-idf/
df_news_covid_mexico_palabras.groupBy('article_id', 'palabra','publish_date')\
    .count()\
    .orderBy('count', ascending=False)\
    .show(25)

In [None]:
#https://sigdelta.com/blog/word-count-in-spark-with-a-pinch-of-tf-idf-continued/
w = Window.partitionBy(df_news_covid_mexico_palabras['article_id'])

article_tf = df_news_covid_mexico_palabras.groupBy('article_id', 'palabra', 'publish_date')\
    .agg(count('*').alias('n_w'),sum(count('*')).over(w).alias('n_d'),(count('*')/sum(count('*')).over(w)).alias('tf'))\
    .orderBy('n_w', ascending=False)\
    .cache()

article_tf.show(truncate=15)

In [None]:
w = Window.partitionBy('palabra')

c_d = df_news_covid_mexico_palabras.select('article_id').distinct().count()

article_idf = df_news_covid_mexico_palabras.groupBy('palabra', 'article_id','publish_date').agg(
        lit(c_d).alias('c_d'),
        count('*').over(w).alias('i_d'),
        log(lit(c_d)/count('*').over(w)).alias('idf')
    )\
    .orderBy('idf', ascending=False)\
    .cache()

In [None]:
article_idf.show(150, truncate=15)

In [None]:
article_tfidf = article_tf.join(article_idf, ['article_id', 'palabra', 'publish_date'])\
    .withColumn('tf_idf', col('tf') * col('idf'))\
    .cache()

In [None]:
article_tfidf.orderBy('tf_idf', ascending=False).show(150,truncate=12)

In [None]:
w = Window.partitionBy('article_id').orderBy(col('tf_idf').desc())

article_tfidf_top_15=article_tfidf.withColumn('rank', rank().over(w))\
    .where('rank <= 15')\
    .drop('rank')\
    .orderBy('article_id', 'tf_idf','n_w')\
    .select('article_id','publish_date','palabra','n_w','tf_idf')

In [None]:
article_tfidf_top_15.show(truncate=12, n=30)

In [None]:
article_tfidf_top_15_site = article_tfidf_top_15.join(df_news_covid_mexico_date_text, ['article_id','publish_date']).select('article_id','publish_date','site','palabra','n_w','tf_idf')

In [None]:
article_tfidf_top_15_site.show(15)

In [None]:
article_tfidf_top_15_site.write.jdbc(url=url, table="tb_news_covid_mexico_palabras_top_tfidf", mode=mode, properties=properties)