In [69]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

In [70]:
orders = pd.read_csv('mart_overall.csv')
orders.shape

(98347, 18)

In [71]:
orders.columns

Index(['order_id', 'product_id', 'customer_id', 'seller_id', 'order_status',
       'order_purchase_timestamp', 'order_approved_at',
       'order_delivered_carrier_date', 'order_delivered_customer_date',
       'order_estimated_delivery_date', 'days_to_delivery',
       'estimated_days_to_delivery', 'delay_flag_estimated', 'delay_flag_avg',
       'price', 'freight_value', 'quantity', 'distance'],
      dtype='object')

In [72]:
seller = pd.read_csv('dim_sellers.csv')
customer = pd.read_csv('dim_customers.csv')
orders = pd.merge(orders, seller, on='seller_id', how='left')
orders = pd.merge(orders, customer, on='customer_id', how='left')
orders.head()

Unnamed: 0,order_id,product_id,customer_id,seller_id,order_status,order_purchase_timestamp,order_approved_at,order_delivered_carrier_date,order_delivered_customer_date,order_estimated_delivery_date,...,seller_lat,seller_lng,seller_region,customer_unique_id,customer_zip_code_prefix,customer_city,customer_state,customer_lat,customer_lng,customer_region
0,e481f51cbdc54678b7cc49136f2d6af7,87285b34884572647811a353c7ac498a,9ef432eb6251297304e76186b10a928d,3504c0cb71d7fa48d967e0e4c94d59d9,delivered,2017-10-02 10:56:33,2017-10-02 11:07:15,2017-10-04 19:55:00,2017-10-10 21:25:13,2017-10-18,...,-23.680729,-46.444238,남동부,7c396fd4830fd04220f754e42b4e5bff,3149,sao paulo,SP,-23.576983,-46.587161,남동부
1,53cdb2fc8bc7dce0b6741e2150273451,595fac2a385ac33a80bd5114aec74eb8,b0830fb4747a6c6d20dea0b8c802d7ef,289cdb325fb7e7f891c38608bf9e0962,delivered,2018-07-24 20:41:37,2018-07-26 03:24:27,2018-07-26 14:31:00,2018-08-07 15:27:45,2018-08-13,...,-19.807681,-43.980427,남동부,af07308b275d755c9edb36a90c618231,47813,barreiras,TO,-12.135259,-45.005406,북부
2,53cdb2fc8bc7dce0b6741e2150273451,595fac2a385ac33a80bd5114aec74eb8,b0830fb4747a6c6d20dea0b8c802d7ef,289cdb325fb7e7f891c38608bf9e0962,delivered,2018-07-24 20:41:37,2018-07-26 03:24:27,2018-07-26 14:31:00,2018-08-07 15:27:45,2018-08-13,...,-19.807681,-43.980427,남동부,af07308b275d755c9edb36a90c618231,47813,barreiras,BA,-12.945892,-38.456194,북동부
3,53cdb2fc8bc7dce0b6741e2150273451,595fac2a385ac33a80bd5114aec74eb8,b0830fb4747a6c6d20dea0b8c802d7ef,289cdb325fb7e7f891c38608bf9e0962,delivered,2018-07-24 20:41:37,2018-07-26 03:24:27,2018-07-26 14:31:00,2018-08-07 15:27:45,2018-08-13,...,-19.807681,-43.980427,남동부,af07308b275d755c9edb36a90c618231,47813,barreiras,TO,-12.135259,-45.005406,북부
4,53cdb2fc8bc7dce0b6741e2150273451,595fac2a385ac33a80bd5114aec74eb8,b0830fb4747a6c6d20dea0b8c802d7ef,289cdb325fb7e7f891c38608bf9e0962,delivered,2018-07-24 20:41:37,2018-07-26 03:24:27,2018-07-26 14:31:00,2018-08-07 15:27:45,2018-08-13,...,-19.807681,-43.980427,남동부,af07308b275d755c9edb36a90c618231,47813,barreiras,BA,-12.945892,-38.456194,북동부


In [73]:
reviews = pd.read_csv('dim_review.csv')
reviews.shape

(99224, 10)

In [74]:
orders = orders.merge(reviews, on='order_id')

In [79]:
orders.columns

Index(['order_id', 'product_id', 'customer_id', 'seller_id', 'order_status',
       'order_purchase_timestamp', 'order_approved_at',
       'order_delivered_carrier_date', 'order_delivered_customer_date',
       'order_estimated_delivery_date', 'days_to_delivery',
       'estimated_days_to_delivery', 'delay_flag_estimated', 'delay_flag_avg',
       'price', 'freight_value', 'quantity', 'distance',
       'seller_zip_code_prefix', 'seller_city', 'seller_state', 'seller_lat',
       'seller_lng', 'seller_region', 'customer_unique_id',
       'customer_zip_code_prefix', 'customer_city', 'customer_state',
       'customer_lat', 'customer_lng', 'customer_region', 'review_id',
       'review_score', 'review_comment_title', 'review_comment_message',
       'review_creation_date', 'review_answer_timestamp',
       'review_comment_message_tokens', 'sentimental', 'ngrams'],
      dtype='object')

In [80]:
orders.to_csv('mart_reviews.csv', index=False)

In [81]:
from pyspark.sql import SparkSession
# Spark 세션 생성
spark = SparkSession.builder.appName("olist").getOrCreate()

spk_orders = spark.read.csv('mart_reviews.csv', header=True, inferSchema=True)
spk_orders.createOrReplaceTempView("orders")

                                                                                

- 셀러와 구매자가 같은 주인 그룹 (A) / 셀러와 구매자가 다른 주인 그룹 (B)으로 나누어서 배송기간과 리뷰 점수

In [82]:
from pyspark.sql.functions import col, when, count, isnan, isnull, mean, lit, sum, avg, round
spk_orders = spk_orders.withColumn("same_state", when(col("seller_state") == col("customer_state"), "Yes").otherwise("No"))

In [83]:
spk_orders = spk_orders.withColumn("delay_flag_estimated",when(col("delay_flag_estimated") == 1, "Yes").otherwise("No"))


In [84]:
spk_orders = spk_orders.withColumn("delay_flag_avg",when(col("delay_flag_avg") == 1, "Yes").otherwise("No"))


In [85]:
spk_orders.show()

+--------------------+--------------------+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+----------------+--------------------------+--------------------+--------------+------+-------------+--------+------------------+----------------------+--------------------+------------+-------------------+-------------------+-------------+--------------------+------------------------+--------------------+--------------+-------------------+-------------------+---------------+--------------------+------------+--------------------+----------------------+--------------------+-----------------------+-----------------------------+-----------+--------------------+----------+
|            order_id|          product_id|         customer_id|           seller_id|order_status|order_purchase_timestamp|  order_approved_at|order_delivered_carrier_date|order_delivered_cust

In [86]:
# Create a temporary view from the DataFrame
spk_orders.createOrReplaceTempView("df_view")

query = """
    SELECT AVG(review_score) as avg_review_score,COUNT(DISTINCT order_id) as cnt, same_state ,AVG(estimated_days_to_delivery) as avg_estimated_days_to_delivery , AVG(days_to_delivery) as avg_days_to_delivery
    FROM df_view
    GROUP BY same_state
    ORDER BY same_state
"""
spark.sql(query).show()



+-----------------+-----+----------+------------------------------+--------------------+
| avg_review_score|  cnt|same_state|avg_estimated_days_to_delivery|avg_days_to_delivery|
+-----------------+-----+----------+------------------------------+--------------------+
|4.206247767236931|56047|        No|            25.391590256552746|  12.180248481721113|
|4.273393553205891|33692|       Yes|            16.300585860000467|   6.501085358168195|
+-----------------+-----+----------+------------------------------+--------------------+



                                                                                

In [87]:
query = """
    SELECT AVG(review_score) as avg_review_score
    FROM df_view
"""
spark.sql(query).show()

+----------------+
|avg_review_score|
+----------------+
|4.23053993345831|
+----------------+



In [88]:
# Create a temporary view from the DataFrame
spk_orders.createOrReplaceTempView("df_view")

query = """
    SELECT  same_state ,delay_flag_estimated,COUNT(DISTINCT order_id) as cnt
    FROM df_view
    GROUP BY same_state,delay_flag_estimated
    ORDER BY same_state,delay_flag_estimated
"""
spark.sql(query).show()



+----------+--------------------+-----+
|same_state|delay_flag_estimated|  cnt|
+----------+--------------------+-----+
|        No|                  No|54075|
|        No|                 Yes| 1972|
|       Yes|                  No|32104|
|       Yes|                 Yes| 1588|
+----------+--------------------+-----+



                                                                                

In [89]:
df = spk_orders

In [90]:
import scipy.stats as stats
pdf = df.select('review_score', 'days_to_delivery').toPandas()

# Convert review_score to numeric
pdf['review_score'] = pd.to_numeric(pdf['review_score'])

corr, p_value = stats.pearsonr(pdf['review_score'], pdf['days_to_delivery'])
print("Correlation: ", corr)
print("P-value: ", p_value)



Correlation:  -0.17466135223796891
P-value:  0.0


- 예상과 다르게 배송기간과 만족도와 유의미한 상관관계가 없다.
- p_val이 0.05보다 작아 통계적으로 유의미하다 볼 수 있으나, 다른 요인에 의해 강한 영향을 받을것 같다.

In [98]:
# Convert Spark DataFrame to Pandas DataFrame
pdf = df.select('review_score', 'delay_flag_estimated','delay_flag_avg','same_state').toPandas()

A_grouped = pdf.loc[pdf['delay_flag_estimated'] == 'Yes']['review_score'].astype(float)
B_grouped = pdf.loc[pdf['delay_flag_estimated'] == 'No']['review_score'].astype(float)

t_stat , p_value = stats.ttest_ind(A_grouped, B_grouped)
print("T-statistic: ", t_stat)
print("P-value: ", p_value)
print("지연 배송 평균 리뷰 점수:", A_grouped.mean())
print("정상 배송 평균 리뷰 점수:", B_grouped.mean())


T-statistic:  -67.03772226176206
P-value:  0.0
지연 배송 평균 리뷰 점수: 3.1079958463136035
정상 배송 평균 리뷰 점수: 4.278116665346325


In [99]:

A_grouped = pdf.loc[pdf['delay_flag_avg'] == 'Yes']['review_score'].astype(float)
B_grouped = pdf.loc[pdf['delay_flag_avg'] == 'No']['review_score'].astype(float)

t_stat , p_value = stats.ttest_ind(A_grouped, B_grouped)
print("T-statistic: ", t_stat)
print("P-value: ", p_value)
print("지연 배송 평균 리뷰 점수:", A_grouped.mean())
print("정상 배송 평균 리뷰 점수:", B_grouped.mean())


T-statistic:  -42.17078032500179
P-value:  0.0
지연 배송 평균 리뷰 점수: 4.051297665042836
정상 배송 평균 리뷰 점수: 4.35111161332241


In [101]:
# 4가지 그룹으로 분류
A_grouped = pdf.loc[(pdf['delay_flag_estimated'] == 'Yes') & (pdf['same_state'] == 'No'), 'review_score'].astype(float)
B_grouped = pdf.loc[(pdf['delay_flag_estimated'] == 'Yes') & (pdf['same_state'] == 'Yes'), 'review_score'].astype(float)
C_grouped = pdf.loc[(pdf['delay_flag_estimated'] == 'No') & (pdf['same_state'] == 'No'), 'review_score'].astype(float)
D_grouped = pdf.loc[(pdf['delay_flag_estimated'] == 'No') & (pdf['same_state'] == 'Yes'), 'review_score'].astype(float)

# 평균 비교
print("A (지연 배송 & 다른 주) 평균 리뷰 점수:", A_grouped.mean())
print("B (지연 배송 & 같은 주) 평균 리뷰 점수:", B_grouped.mean())
print("C (정상 배송 & 다른 주) 평균 리뷰 점수:", C_grouped.mean())
print("D (정상 배송 & 같은 주) 평균 리뷰 점수:", D_grouped.mean())


A (지연 배송 & 다른 주) 평균 리뷰 점수: 3.0862745098039217
B (지연 배송 & 같은 주) 평균 리뷰 점수: 3.138308457711443
C (정상 배송 & 다른 주) 평균 리뷰 점수: 4.249416000219859
D (정상 배송 & 같은 주) 평균 리뷰 점수: 4.329267994024441


- 예상 배송 기간에 비해 실제 배송 기간이 더 긴 경우 실제 배송기간과 상관없이 만족도가 현저히 떨어짐을 알 수 있다.
- 마찬가지로 평균 배송기간에 비해 실제 배송 기간이 더 긴 경우에도 만족도가 떨어짐을 알 수 있다.

In [None]:

query = """
        SELECT seller_region , COUNT(DISTINCT order_id) as cnt
    FROM df_view
    GROUP BY seller_region
    ORDER BY 2 DESC
"""
spark.sql(query).show()

[Stage 117:====>                                                  (1 + 11) / 12]

+-------------+-----+
|seller_region|  cnt|
+-------------+-----+
|       남동부|74452|
|         남부|12357|
|       중서부| 1546|
|       북동부| 1431|
|         북부|   33|
+-------------+-----+



                                                                                

In [103]:

query = """
        SELECT customer_region , COUNT(DISTINCT order_id) as cnt
    FROM df_view
    GROUP BY customer_region
    ORDER BY 2 DESC
"""
spark.sql(query).show()

+---------------+-----+
|customer_region|  cnt|
+---------------+-----+
|         남동부|62372|
|           남부|12870|
|         북동부| 7455|
|         중서부| 5523|
|           북부| 1369|
+---------------+-----+



In [104]:
query = """
    SELECT seller_region , customer_region, COUNT(DISTINCT order_id) as cnt
    FROM df_view
    GROUP BY seller_region,customer_region
    ORDER BY 3 DESC
"""
spark.sql(query).show()

+-------------+---------------+-----+
|seller_region|customer_region|  cnt|
+-------------+---------------+-----+
|       남동부|         남동부|53096|
|       남동부|           남부| 9656|
|         남부|         남동부| 7881|
|       남동부|         북동부| 6252|
|       남동부|         중서부| 4567|
|         남부|           남부| 3043|
|       남동부|           북부| 1167|
|       중서부|         남동부| 1011|
|       북동부|         남동부|  759|
|         남부|         북동부|  676|
|         남부|         중서부|  658|
|       북동부|         북동부|  406|
|       중서부|         중서부|  213|
|       중서부|         북동부|  160|
|         남부|           북부|  141|
|       중서부|           남부|  132|
|       북동부|           남부|  130|
|       북동부|         중서부|  115|
|       중서부|           북부|   37|
|       북동부|           북부|   29|
+-------------+---------------+-----+
only showing top 20 rows



In [119]:
query = """
    SELECT seller_region , customer_region,ROUND(SUM(CASE WHEN delay_flag_estimated = "Yes" THEN 1 ELSE 0 END)/COUNT(DISTINCT order_id)*100.0,2) as delay_proportion
    ,COUNT(DISTINCT order_id) as cnt
    FROM df_view
    WHERE seller_region != '북부'
    GROUP BY seller_region,customer_region
    ORDER BY 3 DESC
"""
spark.sql(query).show()


+-------------+---------------+----------------+-----+
|seller_region|customer_region|delay_proportion|  cnt|
+-------------+---------------+----------------+-----+
|       중서부|           북부|           21.62|   37|
|       북동부|         중서부|           15.65|  115|
|       북동부|           남부|           13.08|  130|
|       중서부|         중서부|            9.86|  213|
|       남동부|         중서부|            8.19| 4567|
|       북동부|         북동부|             6.9|  406|
|       중서부|         북동부|            6.25|  160|
|       남동부|         남동부|            6.08|53096|
|       중서부|           남부|            6.06|  132|
|       남동부|         북동부|            4.96| 6252|
|       중서부|         남동부|            4.75| 1011|
|       남동부|           북부|            4.63| 1167|
|       북동부|         남동부|            4.61|  759|
|       남동부|           남부|            3.79| 9656|
|       북동부|           북부|            3.45|   29|
|         남부|           남부|            3.02| 3043|
|         남부|         북동부|            2.66|

In [120]:
query = """
    SELECT seller_region , customer_region, COUNT(DISTINCT order_id) as cnt , ROUND(AVG(distance),2) as avg_distance
    FROM df_view
    WHERE delay_flag_estimated = 'Yes'
    GROUP BY seller_region,customer_region
    ORDER BY 3 DESC
"""
spark.sql(query).show()


+-------------+---------------+----+------------+
|seller_region|customer_region| cnt|avg_distance|
+-------------+---------------+----+------------+
|       남동부|         남동부|2513|      302.45|
|       남동부|           남부| 258|      657.66|
|       남동부|         북동부| 225|     1825.38|
|       남동부|         중서부| 195|      814.23|
|         남부|         남동부| 150|      685.36|
|         남부|           남부|  80|      475.34|
|       북동부|         남동부|  31|     2048.99|
|       중서부|         남동부|  21|      805.43|
|       남동부|           북부|  20|     2005.64|
|       북동부|         북동부|  17|      458.89|
|         남부|         북동부|  15|     2721.56|
|         남부|         중서부|  10|     1089.24|
|       북동부|           남부|  10|     2470.44|
|       북동부|         중서부|   9|      1570.5|
|       중서부|         중서부|   8|      304.94|
|       중서부|           남부|   4|      1119.8|
|       중서부|         북동부|   4|     1539.25|
|         남부|           북부|   2|     1801.09|
|       중서부|           북부|   2|      895.47|
| 

In [121]:
query = """
    SELECT seller_region , customer_region, ROUND(COUNT(DISTINCT 
        CASE WHEN delay_flag_estimated = 'Yes' THEN order_id END)/COUNT(DISTINCT order_id)*100.0,2) as cnt
        ,ROUND(AVG(distance),2) as avg_distance
    FROM df_view
    WHERE seller_region != '북부'
    GROUP BY seller_region,customer_region
    ORDER BY 3 DESC
"""
spark.sql(query).show()




+-------------+---------------+----+------------+
|seller_region|customer_region| cnt|avg_distance|
+-------------+---------------+----+------------+
|       북동부|         중서부|7.83|     1583.28|
|       북동부|           남부|7.69|     2357.93|
|       중서부|           북부|5.41|     1224.75|
|       남동부|         남동부|4.73|      283.64|
|       남동부|         중서부|4.27|      853.31|
|       북동부|         북동부|4.19|      623.57|
|       북동부|         남동부|4.08|     1531.66|
|       중서부|         중서부|3.76|      258.96|
|       남동부|         북동부| 3.6|     1755.51|
|       북동부|           북부|3.45|     1793.58|
|       중서부|           남부|3.03|     1190.29|
|       남동부|           남부|2.67|      731.04|
|         남부|           남부|2.63|      384.07|
|       중서부|         북동부| 2.5|     1403.66|
|         남부|         북동부|2.22|     2277.25|
|       중서부|         남동부|2.08|      790.29|
|         남부|         남동부| 1.9|      684.39|
|       남동부|           북부|1.71|      2051.3|
|         남부|         중서부|1.52|     1128.53|
|  

                                                                                

In [None]:
import scipy.stats as stats
pdf = df.select('review_score', 'days_to_delivery').toPandas()

# Convert review_score to numeric
pdf['review_score'] = pd.to_numeric(pdf['review_score'])

corr, p_value = stats.pearsonr(pdf['review_score'], pdf['days_to_delivery'])
print("Correlation: ", corr)
print("P-value: ", p_value)



 - 예상 배송시간은 어떻게 산정되는가?
    - 예상 배송시간의 분포..

In [37]:
# Convert Spark DataFrame to Pandas DataFrame
pdf = df.toPandas()

A_grouped = pdf.loc[pdf['delay_flag_estimated'] == 'Yes']['distance'].astype(float)
B_grouped = pdf.loc[pdf['delay_flag_estimated'] == 'No']['distance'].astype(float)

t_stat , p_value = stats.ttest_ind(A_grouped, B_grouped)
print("T-statistic: ", t_stat)
print("P-value: ", p_value)
print("지연 배송 평균 거리(km):", A_grouped.mean())
print("정상 배송 평균 거리(km):", B_grouped.mean())


T-statistic:  -7.186129858260473
P-value:  6.712447462292546e-13
지연 배송 평균 거리(km): 509.4861533180567
정상 배송 평균 거리(km): 577.0121963461853


In [16]:
query = """
    SELECT COUNT(DISTINCT order_id) as cnt, delay_flag_estimated ,seller_state
    FROM df_view
    WHERE delay_flag_estimated = 'Yes'
    GROUP BY delay_flag_estimated, seller_state
    ORDER BY delay_flag_estimated, seller_state
"""
spark.sql(query).show()

+----+--------------------+------------+
| cnt|delay_flag_estimated|seller_state|
+----+--------------------+------------+
|  16|                 Yes|          BA|
|   3|                 Yes|          CE|
|  20|                 Yes|          DF|
|  13|                 Yes|          ES|
|   6|                 Yes|          GO|
|  39|                 Yes|          MA|
| 159|                 Yes|          MG|
|  11|                 Yes|          MS|
|   2|                 Yes|          MT|
|   8|                 Yes|          PE|
| 139|                 Yes|          PR|
| 183|                 Yes|          RJ|
|   1|                 Yes|          RN|
|  40|                 Yes|          RS|
|  77|                 Yes|          SC|
|   1|                 Yes|          SE|
|2843|                 Yes|          SP|
|   1|                 Yes|          TO|
+----+--------------------+------------+



In [36]:
from pyspark.sql.functions import avg

df.agg(avg('review_score').alias('avg_review_score')).show()

+-----------------+
| avg_review_score|
+-----------------+
|4.231621236777868|
+-----------------+



#### 브라질 3대 지역 vs 나머지 비교

-  group_A = ['SP', 'RJ', 'MG'](상파울루 ,미나스제라이스,리우데자네이루), group_B = 나머지
-  group_A의 배송기간과 만족도와 group_B의 배송기간과 만족도를 비교

In [None]:
query = """
    SELECT seller_region , customer_region, COUNT(DISTINCT order_id) as cnt , ROUND(AVG(distance),2) as avg_distance
    FROM df_view
    WHERE delay_flag_estimated = 'Yes' AND
"""