In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import pyspark.pandas as ps
import pandas as pd



In [2]:
import os
import sys
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

In [3]:
spark = SparkSession.builder.appName("consistency_test").getOrCreate()

In [4]:
df_fintech = spark.read.csv("clean_fintech.csv", header = True, inferSchema=True)
df_fintech.show(2)

+-------+-----+----+------------+--------+----------+------------------+---------+--------+--------------+-----------+--------+--------------------+--------------+--------+------------+--------+------------+-----------------+------------+--------------+--------------+-------------+-------------+-----------+--------------+-----------+-----------+
|user_id|churn| age|credit_score|deposits|withdrawal|purchases_partners|purchases|cc_taken|cc_recommended|cc_disliked|cc_liked|cc_application_begin|app_downloaded|web_user|app_web_user|ios_user|android_user|registered_phones|payment_type|waiting_4_loan|cancelled_loan|received_loan|rejected_loan|zodiac_sign|rewards_earned|reward_rate|is_referred|
+-------+-----+----+------------+--------+----------+------------------+---------+--------+--------------+-----------+--------+--------------------+--------------+--------+------------+--------+------------+-----------------+------------+--------------+--------------+-------------+-------------+--------

## Transformations to perform:
- Multiply all numeric columns * 2.
- Delete the letter "e" from all str columns.
- Set all bool variables to True.
- Create 3 extra numeric columns:
    - Mean of purchases.
    - Median of age.
    - Mean of credit_score.

In [14]:
df_fintech_2 = df_fintech.select('age','credit_score','purchases','zodiac_sign','payment_type','churn','cancelled_loan','received_loan')

In [15]:
df_fintech_2 = df_fintech_2.withColumn("age", df_fintech_2.age.cast("int"))

In [None]:
df_fintech_3 = df_fintech_2

In [None]:
df_fintech_2.show(3)

In [None]:
df_fintech_2.printSchema()

In [None]:
# df_fintech_2 = df_fintech_2.withColumn("purchases_mean", lit(df_fintech_2.select(mean('purchases')).collect()[0][0]))\
#              .withColumn("score_mean", lit(df_fintech_2.select(mean('credit_score')).collect()[0][0]))\
#             .withColumn("age_median", lit(df_fintech_2.select(median('age')).collect()[0][0]))

In [7]:
def transform_bool(df):
    for c in [f.name for f in df.schema.fields if isinstance(f.dataType, BooleanType)]:
        df = df.withColumn(c, lit(True))
    return df

def transform_str(df):
    for c in [f.name for f in df.schema.fields if isinstance(f.dataType, StringType)]:
        df = df.withColumn(c, regexp_replace(c, 'e', ''))
    return df

def transform_numeric(df):
    for c in [f.name for f in df.schema.fields if isinstance (f.dataType, (IntegerType,DoubleType))]:
        df = df.withColumn(c, df[c]*2)
    return df

def transform_extracols(df):
    df = df.withColumn("purchases_mean", lit(df.select(mean('purchases')).collect()[0][0]))\
             .withColumn("score_mean", lit(df.select(mean('credit_score')).collect()[0][0]))\
            .withColumn("age_median", lit(df.select(median('age')).collect()[0][0]))
    return df

In [12]:
df_fintech_2 = df_fintech_2.transform(transform_str).transform(transform_numeric).transform(transform_bool).transform(transform_extracols)
# result = (
#    df.lazy()
#    .pipe(add_position_column)
#    .pipe(add_squad_number_column)
#    .collect()
#)
#
#result
# https://typethepipe.com/vizs-and-tips/python-polars-pipe-function-to-one-more-columns/ 

In [13]:
df_fintech_2.show()

+---+-----------------+---------+-----------+------------+-----+--------------+-------------+-----------------+------------------+----------+
|age|     credit_score|purchases|zodiac_sign|payment_type|churn|cancelled_loan|received_loan|   purchases_mean|        score_mean|age_median|
+---+-----------------+---------+-----------+------------+-----+--------------+-------------+-----------------+------------------+----------+
| 42|           1154.0|       90|      Piscs| Smi-Monthly| true|          true|         true|6.318724749692605|1085.1526258518454|      60.0|
| 62|           1038.0|        0|      Virgo|     Bi-Wkly| true|          true|         true|6.318724749692605|1085.1526258518454|      60.0|
| 52|1085.031199631591|        0|Sagittarius|        Wkly| true|          true|         true|6.318724749692605|1085.1526258518454|      60.0|
| 66|           1116.0|        0|         Lo|     Bi-Wkly| true|          true|         true|6.318724749692605|1085.1526258518454|      60.0|
| 52| 

In [None]:
pandas_df = pd.DataFrame({
'age':[42,62,52,66,52],
'credit_score':[1154.0000,1038.0000,1085.031199631591,1116.0000,1118.0000],
'purchases':[90,0,0,0,0],
'zodiac_sign':['Piscs','Virgo','Sagittarius','Lo','Virgo'],
'payment_type':['Smi-Monthly','Bi-Wkly','Wkly','Bi-Wkly','Bi-Wkly'],
'churn':[True,True,True,True,True],
'cancelled_loan':[True,True,True,True,True],
'received_loan':[True,True,True,True,True],
'purchases_mean':[6.318724749692605,6.318724749692605,6.318724749692605,6.318724749692605,6.318724749692605],
'score_mean':[1085.1526258518454,1085.1526258518454,1085.1526258518454,1085.1526258518454,1085.1526258518454],
'age_median':[60.0,60.0,60.0,60.0,60.0]
})
###
pyspark_schema = StructType([
StructField('age',IntegerType()),
StructField('credit_score',DoubleType()),
StructField('purchases',IntegerType()),
StructField('zodiac_sign',StringType()),
StructField('payment_type',StringType()),
StructField('churn',BooleanType(),False),
StructField('cancelled_loan',BooleanType(),False),
StructField('received_loan',BooleanType(),False),
StructField('purchases_mean',DoubleType(),False),
StructField('score_mean',DoubleType(), False),
StructField('age_median',DoubleType(),False)
])

df_expected = spark.createDataFrame(pandas_df, pyspark_schema)

In [None]:
df_fintech_2.printSchema()

In [None]:
df_expected.printSchema()

In [None]:
df_fintech_2.limit(5).collect()

In [None]:
df_expected.collect()

In [None]:
# df_orig.withColumn('hash_value', hash(*sorted(df_orig.columns))).select('hash_value').show(10)
# df_expected.withColumn('hash_value', hash(*sorted(df_expected.columns))).select('hash_value').show(10)
# Sum the hashes, see https://shortest.link/28YE
# value = df.agg(F.sum('hash_value')).collect()[0][0]
# https://stackoverflow.com/questions/52619099/pytest-assert-for-pyspark-dataframe-comparison

In [None]:
assert sorted(df_expected.collect()) == sorted(df_fintech_2.limit(5).collect()), "Assertion failed"
print("Assertion completed succesfully!")

---

In [10]:
# Apply transform df_orig inside the assert function.
def assert_transform(df_orig):
    
    #transform orig
    df_orig = df_orig.transform(transform_str).transform(transform_numeric).transform(transform_bool).transform(transform_extracols)
    #expected df
    pandas_df = pd.DataFrame({
    'age':[42,62,52,66,52],
    'credit_score':[1154.0000,1038.0000,1085.031199631591,1116.0000,1118.0000],
    'purchases':[90,0,0,0,0],
    'zodiac_sign':['Piscs','Virgo','Sagittarius','Lo','Virgo'],
    'payment_type':['Smi-Monthly','Bi-Wkly','Wkly','Bi-Wkly','Bi-Wkly'],
    'churn':[True,True,True,True,True],
    'cancelled_loan':[True,True,True,True,True],
    'received_loan':[True,True,True,True,True],
    'purchases_mean':[6.318724749692605,6.318724749692605,6.318724749692605,6.318724749692605,6.318724749692605],
    'score_mean':[1085.1526258518454,1085.1526258518454,1085.1526258518454,1085.1526258518454,1085.1526258518454],
    'age_median':[60.0,60.0,60.0,60.0,60.0]
    })
    ###
    pyspark_schema = StructType([
    StructField('age',IntegerType()),
    StructField('credit_score',DoubleType()),
    StructField('purchases',IntegerType()),
    StructField('zodiac_sign',StringType()),
    StructField('payment_type',StringType()),
    StructField('churn',BooleanType(),False),
    StructField('cancelled_loan',BooleanType(),False),
    StructField('received_loan',BooleanType(),False),
    StructField('purchases_mean',DoubleType(),False),
    StructField('score_mean',DoubleType(), False),
    StructField('age_median',DoubleType(),False)
    ])

    df_expected = spark.createDataFrame(pandas_df, pyspark_schema)
    df_orig.show()
    assert sorted(df_expected.collect()) == sorted(df_orig.limit(5).collect()), "Assertion failed"
    print("Assertion completed succesfully!")

In [11]:
assert_transform(df_fintech_3)

+---+-----------------+---------+-----------+------------+-----+--------------+-------------+--------------+------------------+----------+
|age|     credit_score|purchases|zodiac_sign|payment_type|churn|cancelled_loan|received_loan|purchases_mean|        score_mean|age_median|
+---+-----------------+---------+-----------+------------+-----+--------------+-------------+--------------+------------------+----------+
| 42|           1154.0|       90|      Piscs| Smi-Monthly| true|          true|         true|          18.0|1102.2062399263182|      52.0|
| 62|           1038.0|        0|      Virgo|     Bi-Wkly| true|          true|         true|          18.0|1102.2062399263182|      52.0|
| 52|1085.031199631591|        0|Sagittarius|        Wkly| true|          true|         true|          18.0|1102.2062399263182|      52.0|
| 66|           1116.0|        0|         Lo|     Bi-Wkly| true|          true|         true|          18.0|1102.2062399263182|      52.0|
| 52|           1118.0|    

AssertionError: Assertion failed

In [None]:
# para la versión 3.5.0 en adelante: 
# https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.testing.assertDataFrameEqual.html