In [1]:
from pyspark.sql import SparkSession

In [2]:
pip install nltk

Collecting nltk
  Downloading nltk-3.8.1-py3-none-any.whl (1.5 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.5/1.5 MB[0m [31m1.2 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m0m
[?25hCollecting regex>=2021.8.3
  Downloading regex-2023.12.25-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (773 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m773.4/773.4 kB[0m [31m1.8 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
Installing collected packages: regex, nltk
Successfully installed nltk-3.8.1 regex-2023.12.25
Note: you may need to restart the kernel to use updated packages.


In [4]:
import pandas as pd
import re
import nltk
import mlflow
import numpy as np
from nltk.stem import WordNetLemmatizer
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, classification_report, accuracy_score
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import LogisticRegression
from sklearn import metrics
from sklearn.ensemble import RandomForestClassifier
pd.set_option("display.max_colwidth", None)

In [5]:

spark = SparkSession.builder \
    .appName('Ingest checkin table into bronze') \
    .master('spark://spark-master:7077') \
    .config("hive.metastore.uris", "thrift://hive-metastore:9083")\
    .config("spark.hadoop.fs.s3a.access.key", 'minio') \
    .config("spark.hadoop.fs.s3a.secret.key", 'minio123') \
    .config("spark.hadoop.fs.s3a.endpoint", 'minio:9000')\
    .config("spark.hadoop.fs.s3a.path.style.access", "true")\
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false")\
    .config('spark.hadoop.fs.s3a.aws.credentials.provider', 'org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider')\
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")\
    .config('spark.sql.warehouse.dir', f's3a://lakehouse/')\
    .enableHiveSupport()\
    .getOrCreate()

In [6]:
spark.sql('show schemas').show()

+---------+
|namespace|
+---------+
|  default|
|     gold|
|  platium|
|   silver|
+---------+



In [7]:
t="silver.cleaned_order_review"
df1 = spark.read.table(t)

In [8]:
df1.show(20,False)

+--------------------------------+--------------------------------+------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------------+-----------------------+
|review_id                       |order_id                        |review_score|review_comment_message                                                                                                                                                     |review_creation_date|review_answer_timestamp|
+--------------------------------+--------------------------------+------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------------+-----------------------+
|cde97e23deec3aa11e838402378706ee|512ba2efae3409f37660ce26199b32aa|1           |Não posso! Estou aguardand

In [9]:
count = df1.count()

In [10]:
print(count)

40977


In [11]:
df = df1.toPandas()

In [12]:
type(df)

pandas.core.frame.DataFrame

In [13]:
df.head(5)

Unnamed: 0,review_id,order_id,review_score,review_comment_message,review_creation_date,review_answer_timestamp
0,cde97e23deec3aa11e838402378706ee,512ba2efae3409f37660ce26199b32aa,1,"Não posso! Estou aguardando a chegada do produto que comprei!Logo que isso aconteça, terei minha opinião formada!",2017-12-22 00:00:00,2017-12-22 12:20:27
1,de7648e683482829b3ac3ba3a4ae64a0,21dd600edd64e225d04af3670d1803b3,5,Bom produto. Entrega rápida. Recomendo,2018-05-15 00:00:00,2018-05-19 22:06:57
2,b7d03880a3ed8bf093567e511703378d,f01954cab76ef5d938d291d2f1d201e5,1,"Não foi entregue, a primeira vez que acontece isso.",2017-05-18 00:00:00,2017-05-18 23:27:28
3,b2a694b42ac51fe76833c5d0cc0a6285,5bfad1932352fcd447ddb67c215b5552,5,"OTIMO ,",2018-03-06 00:00:00,2018-03-20 19:44:26
4,d92095a461b40afed6baeafbd158c3ce,d3537c571b02f225be1bc2d3b936beff,5,tudo aconteceu como o previsto,2017-12-22 00:00:00,2017-12-24 11:04:20


In [16]:
df_comments = df.loc[:, ["review_score", "review_comment_message"]]
df_comments.columns = ["score", "comment"]
df_comments.head(10)

Unnamed: 0,score,comment
0,1,"Não posso! Estou aguardando a chegada do produto que comprei!Logo que isso aconteça, terei minha opinião formada!"
1,5,Bom produto. Entrega rápida. Recomendo
2,1,"Não foi entregue, a primeira vez que acontece isso."
3,5,"OTIMO ,"
4,5,tudo aconteceu como o previsto
5,1,O tamanho é pequeno e percebi depois que no anúncio não dá destaque as dimensões do produto. Deixa a desejar
6,4,Entrega rápida e produto muito bacana!!!
7,4,"Boa qualidade, chegou no tempo previsto, tudo certinho..."
8,1,não veio na cor q eu fiz a compra
9,2,Já foi protocolizada junto ao stark pedido de cancelamento da compra. Estou aguardando resposta do responsável.


In [17]:
lemmatizer = WordNetLemmatizer()
def clean_text(text):
    text = text.lower()
    # text = re.sub(
    #     r"[^a-zA-Z?.!,¿\s]+", " ", text
    # )  
    text = re.sub(r"http\S+", "", text)  
    html = re.compile(r"<.*?>")
    text = html.sub(r"", text) 
    punctuations = "@#!?+&*[]-%.:/();$=><|{}^" + "'`" + "_"
    for p in punctuations:
        text = text.replace(p, "")  
    #text = [word.lower() for word in text.split() if word.lower() not in sw]
    text = [word.lower() for word in text.split()]
    text = [lemmatizer.lemmatize(word) for word in text]
    text = " ".join(text) 
    emoji_pattern = re.compile("["
                           u"\U0001F600-\U0001F64F"  # emoticons
                           u"\U0001F300-\U0001F5FF"  # symbols & pictographs
                           u"\U0001F680-\U0001F6FF"  # transport & map symbols
                           u"\U0001F1E0-\U0001F1FF"  # flags (iOS)
                           u"\U00002702-\U000027B0"
                           u"\U000024C2-\U0001F251"
                           "]+", flags=re.UNICODE)
    text = emoji_pattern.sub(r'', text) #Removing emojis

    return text

In [19]:
nltk.download('wordnet')

[nltk_data] Downloading package wordnet to /home/jovyan/nltk_data...


True

In [20]:
df_comments["comment"] = df_comments["comment"].apply(lambda x: clean_text(x))

df_comments.head(20)

Unnamed: 0,score,comment
0,1,"não posso estou aguardando a chegada do produto que compreilogo que isso aconteça, terei minha opinião formada"
1,5,bom produto entrega rápida recomendo
2,1,"não foi entregue, a primeira vez que acontece isso"
3,5,"otimo ,"
4,5,tudo aconteceu como o previsto
5,1,o tamanho é pequeno e percebi depois que no anúncio não dá destaque a dimensões do produto deixa a desejar
6,4,entrega rápida e produto muito bacana
7,4,"boa qualidade, chegou no tempo previsto, tudo certinho"
8,1,não veio na cor q eu fiz a compra
9,2,já foi protocolizada junto ao stark pedido de cancelamento da compra estou aguardando resposta do responsável


In [21]:
mapping = {1: "negative", 2: "negative", 3: "negative", 4: "positive", 5: "positive"}
df_comments["score"] = df_comments["score"].map(mapping)

In [22]:
text_vectorizer = TfidfVectorizer(
    max_features=20000, use_idf=True, smooth_idf=True
)

In [23]:
X = df_comments["comment"]
y = df_comments["score"]
X_pre = text_vectorizer.fit_transform(X)

X_train, X_test, y_train, y_test = train_test_split(
    X_pre, y, stratify=y, train_size=0.8, random_state=22
)
X_train.shape
print(X_train.shape, X_test.shape, y_train.shape, y_test.shape)

(32781, 17863) (8196, 17863) (32781,) (8196,)


In [24]:
mlflow.set_tracking_uri('http://mlflow_server:5000')
experiment_name = "experiment_0903"
mlflow.set_experiment(experiment_name)
mlflow.start_run()
mlflow.sklearn.autolog()

log = LogisticRegression(max_iter=1000,solver='liblinear',C=1.5,penalty='l1')
log.fit(X_train, y_train)
y_pred = log.predict(X_test)
acc = accuracy_score(y_test, y_pred)
#cm = confusion_matrix(y_pred,y_test)
#np.save("confusion_matrix.npy", cm)

    #mlflow.log_artifact("confusion_matrix.npy")
mlflow.log_metric("accuracy", acc)
    #mlflow.log_param("max_iter", log.max_iter)
    #mlflow.log_param("solver", log.solver)
    #mlflow.log_param("C", log.C)
    #mlflow.log_param("penalty", log.penalty)
    #mlflow.sklearn.log_model(log, "sentiment_class")
mlflow.sklearn.log_model(text_vectorizer, "text_vectorizer")
mlflow.end_run()



In [8]:
#spark.sql('SELECT * FROM silver.cleaned_customer').show(10,False)

+--------------------------------+--------------------------------+------------------------+--------------+--------------+
|customer_id                     |customer_unique_id              |customer_zip_code_prefix|customer_city |customer_state|
+--------------------------------+--------------------------------+------------------------+--------------+--------------+
|0054556ea954a76ad6f9c4ba79d34a98|3cc8e80baa86a7befe2b24568cd0faf0|95680                   |canela        |SP            |
|009bd844996868ab5166ade7dd9ab471|63df52c362d4b7183bb6a463452a8183|13076                   |campinas      |SP            |
|00abf30c1a93c7c8b509cb80a22e4dd8|d43e7cbf7354f1f46a7a1b30701017b3|6226                    |osasco        |SP            |
|02625456293ab29f0b11a84835a8c0ab|570eb70ff97166b85ea96be3bfb65fef|31930                   |belo horizonte|MG            |
|028514f8be6e8c2adb9d0b4647ef3a39|be4d46b2c26abc0280f42d8325aa54a0|35200                   |aimores       |MG            |
|033225f6250f5eb

In [25]:
logged_model = 'runs:/dae0d9e1a0a642d1afd024f71a4db0ac/model'

# Load model as a PyFuncModel.
loaded_model = mlflow.pyfunc.load_model(logged_model)

Downloading artifacts:   0%|          | 0/5 [00:00<?, ?it/s]

In [9]:
#spark.sql("UPDATE silver.cleaned_customer SET customer_state = 'SP' WHERE customer_id = '0054556ea954a76ad6f9c4ba79d34a98'")

DataFrame[num_affected_rows: bigint]

In [13]:
#spark.sql("SELECT count(*) FROM silver.cleaned_customer").show()

+--------+
|count(1)|
+--------+
|   99441|
+--------+



In [16]:
#spark.sql("SELECT DISTINCT(customer_id) FROM silver.cleaned_customer").show(5,False)

+--------------------------------+
|customer_id                     |
+--------------------------------+
|36a1aa63bf2ebcd4911e026092700610|
|512f27d822abe6af95d86529e73724a6|
|384fbbcdcf45c174ca6407d4ade90112|
|4632eb5a8f175f6fe020520ae0c678f3|
|174cf4e5e95b5a49bac9cee9ef6cef70|
+--------------------------------+
only showing top 5 rows

