<a href="https://colab.research.google.com/github/tatianaespinola/conjunto-de-dados-de-transacoes-financeiras-limpeza-e-breve-analise/blob/main/PySpark_Finance.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [109]:
# Instalar Java e Spark
!apt-get install openjdk-11-jdk-headless -qq > /dev/null

# Define Spark version and download URL
spark_version = "spark-3.1.2"
hadoop_version = "hadoop2.7"
spark_file = f"{spark_version}-bin-{hadoop_version}.tgz"
spark_url = f"https://archive.apache.org/dist/spark/{spark_version}/{spark_file}"

# Download Spark
!wget -q {spark_url}

# Check if download was successful and extract
import os
if os.path.exists(spark_file):
    print(f"{spark_file} downloaded successfully. Extracting...")
    !tar xf {spark_file}
    print("Extraction complete.")
else:
    print(f"Error: {spark_file} not downloaded.")


# Configurar variáveis
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-{hadoop_version}"

# Instalar findspark e pyspark e inicializar findspark
!pip install -q findspark pyspark
import findspark
findspark.init()

spark-3.1.2-bin-hadoop2.7.tgz downloaded successfully. Extracting...
Extraction complete.


In [110]:
import findspark
findspark.init()
from pyspark.sql.functions import when, col, count, isnan, regexp_replace, sum
from pyspark.sql import functions as F
from pyspark.sql import SparkSession
from functools import reduce

spark = SparkSession.builder.master("local[*]").getOrCreate()

In [111]:
# Criação de um DataFrame
df = spark.read.csv("/content/transactions_data.csv", header=True, inferSchema = True)

In [112]:
# Visualização do esquema do DataFrame incluindo nome das colunas e datatype
df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- date: string (nullable = true)
 |-- client_id: integer (nullable = true)
 |-- card_id: integer (nullable = true)
 |-- amount: string (nullable = true)
 |-- use_chip: string (nullable = true)
 |-- merchant_id: integer (nullable = true)
 |-- merchant_city: string (nullable = true)
 |-- merchant_state: string (nullable = true)
 |-- zip: double (nullable = true)
 |-- mcc: integer (nullable = true)
 |-- errors: string (nullable = true)



In [113]:
# Visualização de dados do DataFrame
df.show()

+-------+-------------------+---------+-------+-------+------------------+-----------+---------------+--------------+-------+----+------+
|     id|               date|client_id|card_id| amount|          use_chip|merchant_id|  merchant_city|merchant_state|    zip| mcc|errors|
+-------+-------------------+---------+-------+-------+------------------+-----------+---------------+--------------+-------+----+------+
|7475327|2010-01-01 00:01:00|     1556|   2972|$-77.00| Swipe Transaction|      59935|         Beulah|            ND|58523.0|5499|  null|
|7475328|2010-01-01 00:02:00|      561|   4575| $14.57| Swipe Transaction|      67570|     Bettendorf|            IA|52722.0|5311|  null|
|7475329|2010-01-01 00:02:00|     1129|    102| $80.00| Swipe Transaction|      27092|          Vista|            CA|92084.0|4829|  null|
|7475331|2010-01-01 00:05:00|      430|   2860|$200.00| Swipe Transaction|      27092|    Crown Point|            IN|46307.0|4829|  null|
|7475332|2010-01-01 00:06:00|     

In [114]:
# Contagem do número de linhas do DataFrame
df.count()

4999

In [115]:
# Exclusão de linhas duplicadas e contagem após essa exclusão
df.dropDuplicates().count()

4999

In [116]:
# Modificar os dados que estão como ''null'' na coluna 'errors'
df = df.withColumn(
    "errors",
    when(col("errors").isNull(), "Sem erro").otherwise(col("errors"))
)

In [117]:
df.show()

+-------+-------------------+---------+-------+-------+------------------+-----------+---------------+--------------+-------+----+--------+
|     id|               date|client_id|card_id| amount|          use_chip|merchant_id|  merchant_city|merchant_state|    zip| mcc|  errors|
+-------+-------------------+---------+-------+-------+------------------+-----------+---------------+--------------+-------+----+--------+
|7475327|2010-01-01 00:01:00|     1556|   2972|$-77.00| Swipe Transaction|      59935|         Beulah|            ND|58523.0|5499|Sem erro|
|7475328|2010-01-01 00:02:00|      561|   4575| $14.57| Swipe Transaction|      67570|     Bettendorf|            IA|52722.0|5311|Sem erro|
|7475329|2010-01-01 00:02:00|     1129|    102| $80.00| Swipe Transaction|      27092|          Vista|            CA|92084.0|4829|Sem erro|
|7475331|2010-01-01 00:05:00|      430|   2860|$200.00| Swipe Transaction|      27092|    Crown Point|            IN|46307.0|4829|Sem erro|
|7475332|2010-01-01 

In [118]:
#Exclua linhas que tem valores nulos em todas as colunas
df = df.dropna(how="all")

In [119]:
df.count()

4999

In [120]:
from pyspark.sql import functions as F

# Contar valores nulos, vazios ou NaN por coluna
null_counts = df.select([
    count(
        when(
            col(c).isNull() | (col(c) == '') | isnan(c), c
        )
    ).alias(c)
    for c in df.columns
])

# Transpor para formato (coluna, qtd_nulos)
null_counts = null_counts.toPandas().T.reset_index()
null_counts.columns = ['coluna', 'qtd_nulos']

# Ordenar do maior para o menor
null_counts = null_counts.sort_values(by='qtd_nulos', ascending=False)

# Exibir resultado
null_counts


Unnamed: 0,coluna,qtd_nulos
9,zip,644
8,merchant_state,635
0,id,0
1,date,0
3,card_id,0
2,client_id,0
4,amount,0
5,use_chip,0
7,merchant_city,0
6,merchant_id,0


In [121]:
df = df.withColumn(
    "merchant_state",
    when((col("merchant_city") == "ONLINE") & col("merchant_state").isNull(), "ONLINE")
    .otherwise(col("merchant_state"))
).withColumn(
    "zip",
    when((col("merchant_city") == "ONLINE") & col("zip").isNull(), "0")
    .otherwise(col("zip"))
)

In [122]:
df.show()

+-------+-------------------+---------+-------+-------+------------------+-----------+---------------+--------------+-------+----+--------+
|     id|               date|client_id|card_id| amount|          use_chip|merchant_id|  merchant_city|merchant_state|    zip| mcc|  errors|
+-------+-------------------+---------+-------+-------+------------------+-----------+---------------+--------------+-------+----+--------+
|7475327|2010-01-01 00:01:00|     1556|   2972|$-77.00| Swipe Transaction|      59935|         Beulah|            ND|58523.0|5499|Sem erro|
|7475328|2010-01-01 00:02:00|      561|   4575| $14.57| Swipe Transaction|      67570|     Bettendorf|            IA|52722.0|5311|Sem erro|
|7475329|2010-01-01 00:02:00|     1129|    102| $80.00| Swipe Transaction|      27092|          Vista|            CA|92084.0|4829|Sem erro|
|7475331|2010-01-01 00:05:00|      430|   2860|$200.00| Swipe Transaction|      27092|    Crown Point|            IN|46307.0|4829|Sem erro|
|7475332|2010-01-01 

In [123]:
# Contar valores nulos, vazios ou NaN por coluna
null_counts = df.select([
    count(
        when(
            col(c).isNull() | (col(c) == '') | isnan(c), c
        )
    ).alias(c)
    for c in df.columns
])

# Transpor para formato (coluna, qtd_nulos)
null_counts = null_counts.toPandas().T.reset_index()
null_counts.columns = ['coluna', 'qtd_nulos']

# Ordenar do maior para o menor
null_counts = null_counts.sort_values(by='qtd_nulos', ascending=False)

# Exibir resultado
null_counts




Unnamed: 0,coluna,qtd_nulos
9,zip,9
0,id,0
2,client_id,0
1,date,0
3,card_id,0
4,amount,0
6,merchant_id,0
5,use_chip,0
7,merchant_city,0
8,merchant_state,0


In [124]:
df.filter(
    reduce(lambda a, b: a | b, (col(c).isNull() for c in df.columns))
).show()


+-------+-------------------+---------+-------+------+-----------------+-----------+---------------+------------------+----+----+--------+
|     id|               date|client_id|card_id|amount|         use_chip|merchant_id|  merchant_city|    merchant_state| zip| mcc|  errors|
+-------+-------------------+---------+-------+------+-----------------+-----------+---------------+------------------+----+----+--------+
|7476010|2010-01-01 07:53:00|     1579|   3830| $6.51|Swipe Transaction|      22204|Puerto Vallarta|            Mexico|null|5541|Sem erro|
|7476549|2010-01-01 10:03:00|      363|   5555| $8.16|Swipe Transaction|      93391|   Vatican City|      Vatican City|null|5812|Sem erro|
|7476704|2010-01-01 10:37:00|      363|   5555|$11.66|Swipe Transaction|      93391|   Vatican City|      Vatican City|null|5812|Sem erro|
|7477375|2010-01-01 12:50:00|     1266|   2478| $9.77|Swipe Transaction|      22204|    Guadalajara|            Mexico|null|5541|Sem erro|
|7477534|2010-01-01 13:19:0

In [125]:
# Modificar os dados que estão como ''null'' na coluna 'zip'
df = df.withColumn(
    "zip",
    when(col("zip").isNull(), "ZIP não informado").otherwise(col("zip"))
)

In [126]:
null_counts


Unnamed: 0,coluna,qtd_nulos
9,zip,9
0,id,0
2,client_id,0
1,date,0
3,card_id,0
4,amount,0
6,merchant_id,0
5,use_chip,0
7,merchant_city,0
8,merchant_state,0


In [127]:
df = (df
    .withColumnRenamed("zip", "zip_code")
    .withColumnRenamed("mcc", "merchant_category_code")
)


In [128]:
df.show()

+-------+-------------------+---------+-------+-------+------------------+-----------+---------------+--------------+--------+----------------------+--------+
|     id|               date|client_id|card_id| amount|          use_chip|merchant_id|  merchant_city|merchant_state|zip_code|merchant_category_code|  errors|
+-------+-------------------+---------+-------+-------+------------------+-----------+---------------+--------------+--------+----------------------+--------+
|7475327|2010-01-01 00:01:00|     1556|   2972|$-77.00| Swipe Transaction|      59935|         Beulah|            ND| 58523.0|                  5499|Sem erro|
|7475328|2010-01-01 00:02:00|      561|   4575| $14.57| Swipe Transaction|      67570|     Bettendorf|            IA| 52722.0|                  5311|Sem erro|
|7475329|2010-01-01 00:02:00|     1129|    102| $80.00| Swipe Transaction|      27092|          Vista|            CA| 92084.0|                  4829|Sem erro|
|7475331|2010-01-01 00:05:00|      430|   2860

In [129]:
# Clean the 'amount' column by removing '$' and ',' and casting to double
df = df.withColumn("amount", regexp_replace(col("amount"), "[$,]", "").cast("double"))

df.select("amount").show()

+------+
|amount|
+------+
| -77.0|
| 14.57|
|  80.0|
| 200.0|
| 46.41|
|  4.81|
|  77.0|
| 26.46|
|261.58|
| 10.74|
|  3.51|
|  2.58|
| 39.63|
| 43.33|
| 49.42|
|  1.09|
| 73.79|
| 100.0|
| 26.04|
| -64.0|
+------+
only showing top 20 rows



In [130]:
# Calculate the total amount spent per client
total_gasto_por_cliente = df.groupBy("client_id").agg(
    sum("amount").alias("total_gasto_por_cliente")
)

# Show the result
total_gasto_por_cliente.show()

+---------+-----------------------+
|client_id|total_gasto_por_cliente|
+---------+-----------------------+
|      148|                  75.16|
|     1591|                  153.0|
|     1238|                 533.67|
|     1645|     -3.270000000000005|
|     1959|                  29.33|
|     1088|                 212.59|
|      496|                 172.73|
|     1127|                 157.29|
|      858|                 134.02|
|     1084|     10.629999999999999|
|      737|     216.09000000000003|
|     1507|                 116.55|
|     1896|                 207.49|
|      623|                 124.37|
|     1618|                  146.2|
|     1352|                 219.29|
|     1903|                 155.96|
|     1699|                 271.99|
|      137|                 329.58|
|      580|                 200.65|
+---------+-----------------------+
only showing top 20 rows



In [131]:
total_gasto_por_cliente.write.parquet("/content/total_gasto_por_cliente.parquet")

AnalysisException: path file:/content/total_gasto_por_cliente.parquet already exists.

In [None]:
#Total gasto por categoria

total_gasto_por_categoria= df.groupBy("use_chip").agg(
    sum("amount").alias("total_gasto_por_categoria")
)

# Show the result
total_gasto_por_categoria.show()

In [None]:
total_gasto_por_categoria.write.parquet("/content/total_gasto_por_categoria.parquet")

In [None]:
#Salvar dados no formato parquet
df.write.parquet("/content/transactions_dados_tratados.parquet")