# Municípios do Brasil

## Preparação do Ambiente

In [75]:
from pyspark.sql import SparkSession
from pyspark.sql.types import DateType
import pyspark.sql.functions as f

# Criação da sessão Spark
spark = SparkSession.builder \
    .appName("Clusterização de Pontos Próximos") \
    .config("spark.jars.packages", "graphframes:graphframes:0.8.2-spark3.0-s_2.12") \
    .getOrCreate()

24/06/07 00:09:46 WARN Utils: Your hostname, codespaces-8e509d resolves to a loopback address: 127.0.0.1; using 172.16.5.4 instead (on interface eth0)
24/06/07 00:09:46 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/usr/local/python/3.10.13/lib/python3.10/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/codespace/.ivy2/cache
The jars for the packages stored in: /home/codespace/.ivy2/jars
graphframes#graphframes added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-aae78927-36f3-4f78-8a10-73f1d16088b8;1.0
	confs: [default]
	found graphframes#graphframes;0.8.2-spark3.0-s_2.12 in spark-packages
	found org.slf4j#slf4j-api;1.7.16 in central
downloading https://repos.spark-packages.org/graphframes/graphframes/0.8.2-spark3.0-s_2.12/graphframes-0.8.2-spark3.0-s_2.12.jar ...
	[SUCCESSFUL ] graphframes#graphframes;0.8.2-spark3.0-s_2.12!graphframes.jar (37ms)
downloading https://repo1.maven.org/maven2/org/slf4j/slf4j-api/1.7.16/slf4j-api-1.7.16.jar ...
	[SUCCESSFUL ] org.slf4j#slf4j-api;1.7.16!slf4j-api.jar (25ms)
:: resolution report :: resolve 1122ms :: artifacts dl 65ms
	:: modules in use:
	graphframes#graphframes;0.8.2-spark3.0-s_2.12 from spark-packages in [default]
	org.slf4j#slf4j-api;1.7.16 from central in [default]
	-----

Definição do diretório de checkpoint (Necessário para funcionamento da lib GraphFrame)

In [114]:
import os
# Configurar o diretório de checkpoint
checkpoint_dir = "/tmp/spark-checkpoint"
os.makedirs(checkpoint_dir, exist_ok=True)
spark.sparkContext.setCheckpointDir(checkpoint_dir)

## Carga dos Dados

In [82]:
df = spark.read.csv('/workspaces/codespaces-jupyter/data/MunicipiosBrasil.csv', header=True, sep=';')\
          .select('ID', 'LATITUDE', 'LONGITUDE', 'MUNICIPIO', 'UF')

In [83]:
df = df.withColumn("LATITUDE", f.regexp_replace("LATITUDE", ',', '.').cast("float"))\
       .withColumn("LONGITUDE", f.regexp_replace("LONGITUDE", ',', '.').cast("float"))

In [84]:
df.show()

+---+--------+---------+--------------------+---+
| ID|LATITUDE|LONGITUDE|           MUNICIPIO| UF|
+---+--------+---------+--------------------+---+
|  1|   -9.82|   -66.88|          ACRELANDIA| AC|
|  2|  -10.94|   -69.56|        ASSIS BRASIL| AC|
|  3|  -11.01|   -68.74|           BRASILEIA| AC|
|  4|   -9.83|   -67.95|              BUJARI| AC|
|  5|  -10.57|   -67.67|            CAPIXABA| AC|
|  6|   -7.63|   -72.67|     CRUZEIRO DO SUL| AC|
|  7|  -11.02|   -68.74|      EPITACIOLANDIA| AC|
|  8|   -8.16|   -70.35|               FEIJO| AC|
|  9|   -9.43|   -71.88|              JORDAO| AC|
| 10|   -7.61|   -72.89|         MANCIO LIMA| AC|
| 11|   -8.83|   -69.26|       MANOEL URBANO| AC|
| 12|   -8.94|   -72.79|MARECHAL THAUMATURGO| AC|
| 13|  -10.27|   -67.15|   PLACIDO DE CASTRO| AC|
| 14|   -9.58|   -67.53|          PORTO ACRE| AC|
| 15|   -8.26|   -72.74|        PORTO WALTER| AC|
| 16|   -9.97|   -67.81|          RIO BRANCO| AC|
| 17|   -7.74|   -72.64|     RODRIGUES ALVES| AC|


In [85]:
df.count()

5509

## CrossJoin

O objetivo é buscar todas as combinações de pontos possíveis.

In [87]:
df_b = df.select([f.col(c).alias(c + "_B") for c in df.columns])

In [93]:
df_full = df.crossJoin(df_b).where('ID < ID_B')

## Calculando a distância

In [105]:
def haversine_distance(df):
    '''
    Converte a latitude e longitude em radianos
    Calcula a diferença entre longitude e latitude
    Aplica a formla de Haversine para calcular a distancia
    '''

    # Raio da Terra em Metros
    EARTH_RADIUS = 6371000

    df_distance =  df.withColumn('lat1_rad', f.radians(f.col('LATITUDE')))\
                     .withColumn('lon1_rad', f.radians(f.col('LONGITUDE')))\
                     .withColumn('lat2_rad', f.radians(f.col('LATITUDE_B')))\
                     .withColumn('lon2_rad', f.radians(f.col('LONGITUDE_B')))\
                     .withColumn('dlon', f.col('lon2_rad') - f.col('lon1_rad'))\
                     .withColumn('dlat', f.col('lat2_rad') - f.col('lat1_rad'))\
                     .withColumn('a', f.pow(f.sin(f.col('dlat') / 2), 2) + 
                                            f.cos(f.col('lat1_rad')) * f.cos(f.col('lat2_rad')) *
                                            f.pow(f.sin(f.col('dlon') / 2), 2))\
                     .withColumn('c', 2 * f.asin(f.sqrt(f.col('a'))))\
                     .withColumn('distance', f.col('c') * EARTH_RADIUS)

    return df_distance

In [109]:
df_distance = haversine_distance(df_full).select('ID', 'ID_B', 'distance')

### Filtra as distância menores que o ponto de corte desejado

In [117]:
# Ponto de corte (em metros)
ponto_corte = 5000

In [118]:
df_filtrado = df_distance.filter(f.col("distance") <= ponto_corte)

## Criação dos Grafos

### Definição dos Vértices e Arestas

In [119]:
from graphframes import GraphFrame

vertices = df_filtrado.select(f.col("ID").alias("id")).union(df_filtrado.select(f.col("ID_B").alias("id"))).distinct()
arestas = df_filtrado.select(f.col("ID").alias("src"), f.col("ID_B").alias("dst"))

In [120]:
# Crie o Grafo
g = GraphFrame(vertices, arestas)



### Encontra os componentes conectados

In [121]:
%%time

result = g.connectedComponents()



CPU times: user 59.5 ms, sys: 10.9 ms, total: 70.4 ms
Wall time: 44.2 s




### Criação dos Clusters

In [122]:
# Agrupar os pontos pelo componente
clusters = result.groupBy("component").agg(f.collect_list("id").alias("points"))

# Mostrar os clusters
clusters.show(10, truncate=False)



+-----------+------------+
|component  |points      |
+-----------+------------+
|8589934592 |[1669, 1437]|
|25769803776|[2100, 1953]|
|25769803777|[5297, 5187]|
|25769803778|[7, 3]      |
|34359738368|[923, 1445] |
|34359738369|[5207, 3167]|
|34359738370|[3944, 3898]|
|42949672960|[3786, 3743]|
|42949672961|[4534, 4417]|
|60129542144|[2194, 2199]|
+-----------+------------+
only showing top 10 rows



                                                                                

# Visualização

In [126]:
df_pandas = df_filtrado.select(f.col('ID').alias('from'),
                               f.col('ID_B').alias('to'),
                               f.col('distance').alias('weight')
                               ).toPandas()

                                                                                

In [129]:
# Explode a lista de pontos para unir com a tabela de municípios
clusters_exploded_df = clusters.withColumn("point", f.explode(f.col("points")))
clusters_exploded_df = clusters_exploded_df.join(df, clusters_exploded_df.point == df.ID)

In [130]:
clusters_pd_df = clusters_exploded_df.select("component", "LATITUDE", "LONGITUDE", "MUNICIPIO", "UF").toPandas()

                                                                                

In [157]:
import plotly.graph_objects as go

# Criar a figura
fig = go.Figure()

# Adicionar camadas de mapa de calor para cada cluster
for component in clusters_pd_df["component"].unique():
    cluster_data = clusters_pd_df[clusters_pd_df["component"] == component]
    fig.add_trace(go.Densitymapbox(
        lat=cluster_data["LATITUDE"],
        lon=cluster_data["LONGITUDE"],
        z=[1]*len(cluster_data),
        radius=40,
        opacity=0.6,
        showscale=False
    ))

# Adicionar os pontos dos municípios
fig.add_trace(go.Scattermapbox(
    lat=clusters_pd_df["LATITUDE"],
    lon=clusters_pd_df["LONGITUDE"],
    mode='markers',
    marker=go.scattermapbox.Marker(size=9),
    text=clusters_pd_df["MUNICIPIO"],
    hoverinfo='text',
    marker_color=clusters_pd_df["component"]
))

# Configurar o layout do mapa
fig.update_layout(
    mapbox=dict(
        style="open-street-map",
        zoom=4,
        center=dict(lat=clusters_pd_df["LATITUDE"].mean(), lon=clusters_pd_df["LONGITUDE"].mean())
    ),
    margin={"r":0,"t":0,"l":0,"b":0}
)

# Mostrar o gráfico
fig.show()