<a href="https://colab.research.google.com/github/vitornimschofsky/spark/blob/main/spark_sense.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

**PARTE 0**

**CONFIGURANDO O SPARK NO AMBIENTE DO COLAB**

In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.0.1/spark-3.0.1-bin-hadoop2.7.tgz
!tar xf spark-3.0.1-bin-hadoop2.7.tgz
!pip install -q findspark

In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.1-bin-hadoop2.7"


In [3]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()


**PARTE 1**

**GERANDO O DF**

In [6]:
spark = SparkSession.builder.appName("PySparkTest").getOrCreate()


data = [
    ("Alice", 34, "Data Scientist"),
    ("Bob", 45, "Data Engineer"),
    ("Cathy", 29, "Data Analyst"),
    ("David", 35, "Data Scientist")
]
columns = ["Name", "Age", "Occupation"]


df = spark.createDataFrame(data, schema=columns)
df.show()


+-----+---+--------------+
| Name|Age|    Occupation|
+-----+---+--------------+
|Alice| 34|Data Scientist|
|  Bob| 45| Data Engineer|
|Cathy| 29|  Data Analyst|
|David| 35|Data Scientist|
+-----+---+--------------+



**NOME E IDADE DE QUEM TEM MAIS DE 30 ANOS**

In [8]:
df_selected = df.select("Name", "Age")

df_filtered = df_selected.filter(df_selected["Age"] > 30)
df_filtered.show()


+-----+---+
| Name|Age|
+-----+---+
|Alice| 34|
|  Bob| 45|
|David| 35|
+-----+---+



**CALCULANDO A MÉDIA DE IDADE POR OCUPAÇÃO**

In [9]:
from pyspark.sql.functions import avg

df_grouped = df.groupBy("Occupation").agg(avg("Age").alias("Average_Age"))
df_grouped.show()


+--------------+-----------+
|    Occupation|Average_Age|
+--------------+-----------+
|Data Scientist|       34.5|
|  Data Analyst|       29.0|
| Data Engineer|       45.0|
+--------------+-----------+



**ORDENANDO PELA MÉDIA DE IDADE EM ORDEM DESC**

In [10]:
df_sorted = df_grouped.orderBy("Average_Age", ascending=False)
df_sorted.show()


+--------------+-----------+
|    Occupation|Average_Age|
+--------------+-----------+
| Data Engineer|       45.0|
|Data Scientist|       34.5|
|  Data Analyst|       29.0|
+--------------+-----------+



**PARTE 2**

**UDFs (User Defined Functions)**




In [11]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

def categorize_age(age):
    if age < 30:
        return "Jovem"
    elif 30 <= age <= 40:
        return "Adulto"
    else:
        return "Senior"

categorize_age_udf = udf(categorize_age, StringType())

df_with_category = df.withColumn("Age_Category", categorize_age_udf(df["Age"]))
df_with_category.show()


+-----+---+--------------+------------+
| Name|Age|    Occupation|Age_Category|
+-----+---+--------------+------------+
|Alice| 34|Data Scientist|      Adulto|
|  Bob| 45| Data Engineer|      Senior|
|Cathy| 29|  Data Analyst|       Jovem|
|David| 35|Data Scientist|      Adulto|
+-----+---+--------------+------------+



**FUNÇÃO DE JANELA** ***(Não sei se entendi corretamente esse calculo, levei em consideração a criação de apenas uma coluna conforme descrição da questão de janelas.)


In [18]:
from pyspark.sql.window import Window
from pyspark.sql.functions import avg

window_spec = Window.partitionBy("Occupation")

df_with_diff = (df.withColumn("Avg", avg("Age").over(window_spec))
                  .withColumn("Age_Diff_From_Avg", df["Age"] - avg("Age").over(window_spec))
)
df_with_diff.show()

+-----+---+--------------+----+-----------------+
| Name|Age|    Occupation| Avg|Age_Diff_From_Avg|
+-----+---+--------------+----+-----------------+
|Alice| 34|Data Scientist|34.5|             -0.5|
|David| 35|Data Scientist|34.5|              0.5|
|Cathy| 29|  Data Analyst|29.0|              0.0|
|  Bob| 45| Data Engineer|45.0|              0.0|
+-----+---+--------------+----+-----------------+



**PARTE 3**

**PARTICIONAMENTE E OTIMIZAÇÃO**

Particionamento é uma estratégia usada para dividir um conjunto de dados em várias partes menores, chamadas de partições, com base em uma coluna específica. No PySpark, essa abordagem pode acelerar as operações de leitura e escrita, pois o Spark consegue processar as partições simultaneamente. Isso é especialmente vantajoso para grandes volumes de dados, onde as operações de entrada e saída (I/O) podem se tornar lentas. Ao particionar de forma eficaz, o Spark acessa apenas as partições necessárias para uma operação, evitando a leitura de dados desnecessários e melhorando o desempenho geral.

In [None]:
# exemplo de particionamento de um dataFrame por uma coluna específica ("Date")
df_partitioned = df.repartition("Date")

df_partitioned.write.partitionBy("Date").parquet("/path/to/output/dir")

**BROADCAST JOIN**

O Broadcast Join é uma técnica no PySpark que melhora o desempenho das operações de join, especialmente quando um dos DataFrames é pequeno o bastante para ser armazenado na memória. Em vez de fazer um join distribuído, que pode consumir muito tempo e recursos, o Spark envia (ou "broadcast") o DataFrame menor para todos os nós do cluster. Assim, cada nó pode realizar o join de forma local, evitando a redistribuição de dados entre os nós.

In [19]:
from pyspark.sql.functions import broadcast


df_large = spark.read.parquet("/path/to/large_dataset")
df_small = spark.read.parquet("/path/to/small_dataset")


df_joined = df_large.join(broadcast(df_small), "key_column")

df_joined.show()


AnalysisException: Path does not exist: file:/path/to/large_dataset;

**PARTE 4**

**INTEGRAÇÃO COM OUTRAS TECNOLOGIAS**

In [20]:
# leitura arquivo CSV
df = spark.read.csv("/path/to/input.csv", header=True, inferSchema=True)

# salvar o dataFrame em formato parquet
df.write.parquet("/path/to/output.parquet")


AnalysisException: Path does not exist: file:/path/to/input.csv;

**INTEGRAÇÃO COM O HADOOP**


PySpark se integra facilmente com o Hadoop HDFS (Hadoop Distributed File System) para leitura e escrita de dados. O Spark pode ler e escrever arquivos diretamente no HDFS, permitindo o processamento de grandes volumes de dados distribuídos. Isso é feito através do uso do URI do HDFS ao especificar o caminho do arquivo.

In [None]:
# leitura de um arquivo do HDFS
df = spark.read.csv("hdfs://namenode:9000/path/to/input.csv", header=True, inferSchema=True)

# processamento do dataFrame (apens um exemplo)
df_filtered = df.filter(df["column"] > 100)

# escrita do resultado de volta no HDFS
df_filtered.write.csv("hdfs://namenode:9000/path/to/output.csv")


**PARTE 5**

**LOG**

**Carregar o arquivo de log em um DataFrame**

In [None]:
df_logs = spark.read.csv("/path/to/log  .csv", header=True, inferSchema=True)


**Contar o número de ações realizadas por cada usuário**

In [None]:
df_user_actions = df_logs.groupBy("user_id").count()


**Encontrar os 10 usuários mais ativos**

In [None]:
df_top_users = df_user_actions.orderBy("count", ascending=False).limit(10)
df_top_users.show()


**Salvar o resultado em um arquivo CSV**

In [None]:
df_top_users.write.csv("/path/to/output/top_users.csv", header=True)
