In [1]:
# Welcome to your new notebook
# Type here in the cell editor to add code!
import pyspark.sql.functions as F
from pyspark.sql.types import *
from notebookutils import mssparkutils
import pandas as pd

# Verificar versión de Spark
print(f"Versión de Spark: {spark.version}")

StatementMeta(, 787d561e-e35a-4ce0-9b0e-18d414904977, 3, Finished, Available, Finished)

Versión de Spark: 3.5.1.5.4.20250914.1


In [2]:
# Definición de esquemas para mejor control de los datos
# Definimos los esquemas de nuestros dataframes para asegurar tipos de datos correctos

# Esquema para clientes
schema_clientes = StructType([
    StructField("ClientID", IntegerType(), False),
    StructField("FirstName", StringType(), True),
    StructField("LastName", StringType(), True),
    StructField("Email", StringType(), True),
    StructField("Region", StringType(), True)
])

# Esquema para brokers
schema_brokers = StructType([
    StructField("BrokerID", IntegerType(), False),
    StructField("BrokerName", StringType(), True),
    StructField("Email", StringType(), True),
    StructField("Region", StringType(), True)
])

# Esquema para campaign
schema_campaign = StructType([
    StructField("CampaignID", IntegerType(), False),
    StructField("Channel", StringType(), True),
    StructField("CampaignName", StringType(), True),
    StructField("StartDate", DateType(), True),
    StructField("EndDate", DateType(), True),
    StructField("BudgetUSD", DoubleType(), True)
])

# Esquema para project
schema_project = StructType([
    StructField("ProjectID", IntegerType(), False),
    StructField("ProjectName", StringType(), True),
    StructField("City", StringType(), True),
    StructField("Region", StringType(), True),
    StructField("LaunchYear", IntegerType(), True),
    StructField("Status", StringType(), True)
])

# Esquema para sale
schema_sale = StructType([
    StructField("SaleID", IntegerType(), False),
    StructField("PropertyID", IntegerType(), False),
    StructField("ClientID", IntegerType(), False),
    StructField("BrokerID", IntegerType(), False),
    StructField("SaleDate", DateType(), True),
    StructField("SalePriceUSD", DoubleType(), True)
])

# Esquema para property
schema_property = StructType([
    StructField("PropertyID", IntegerType(), False),
    StructField("ProjectID", IntegerType(), False),
    StructField("PropertyType", StringType(), True),
    StructField("Size_m2", DoubleType(), True),
    StructField("Bedrooms", IntegerType(), True),
    StructField("Bathrooms", IntegerType(), True),
    StructField("ListPriceUSD", DoubleType(), True),
    StructField("AvailabilityStatus", StringType(), True)
])

# Esquema para lead
schema_lead = StructType([
    StructField("LeadID", IntegerType(), False),
    StructField("ClientID", IntegerType(), False),
    StructField("PropertyID", IntegerType(), False),
    StructField("CampaignID", IntegerType(), False),
    StructField("LeadDate", DateType(), True),
    StructField("LeadSource", StringType(), True)
])


StatementMeta(, 787d561e-e35a-4ce0-9b0e-18d414904977, 4, Finished, Available, Finished)

In [3]:
# Leer los archivos CSV con los esquemas definidos
# Este comando lee los archivos CSV utilizando los esquemas definidos anteriormente

try:
    # Leer archivo de clientes
    df_clientes = spark.read.format("csv") \
        .option("header", "true") \
        .schema(schema_clientes) \
        .load("Files/Raw/clients.csv")

    # Leer archivo de brokers
    df_brokers = spark.read.format("csv") \
        .option("header", "true") \
        .schema(schema_brokers) \
        .load("Files/Raw/brokers.csv")

    # Leer archivo de projects
    df_projects = spark.read.format("csv") \
        .option("header", "true") \
        .schema(schema_project) \
        .load("Files/Raw/projects.csv")

    # Leer archivo de properties
    df_properties = spark.read.format("csv") \
        .option("header", "true") \
        .schema(schema_property) \
        .load("Files/Raw/properties.csv")

    # Leer archivo de campaigns
    df_campaigns = spark.read.format("csv") \
        .option("header", "true") \
        .schema(schema_campaign) \
        .load("Files/Raw/campaigns.csv")

    # Leer archivo de leads
    df_leads = spark.read.format("csv") \
        .option("header", "true") \
        .schema(schema_lead) \
        .load("Files/Raw/leads.csv")
   
    # Leer archivo de sales
    df_sales = spark.read.format("csv") \
        .option("header", "true") \
        .schema(schema_sale) \
        .load("Files/Raw/sales.csv")

    print("✅ Archivos CSV leídos correctamente")
except Exception as e:
    print(f"Error al leer archivos: {str(e)}")

StatementMeta(, 787d561e-e35a-4ce0-9b0e-18d414904977, 5, Finished, Available, Finished)

✅ Archivos CSV leídos correctamente


In [4]:
# Exploración de los datos
# Mostramos una vista previa de los datos cargados

print("Vista previa de los datos de clientes:")
display(df_clientes.limit(5))

print("Vista previa de los datos de brokers:")
display(df_brokers.limit(5))

print("Vista previa de los datos de projects:")
display(df_projects.limit(5))

print("Vista previa de los datos de properties:")
display(df_properties.limit(5))

print("Vista previa de los datos de campaigns:")
display(df_campaigns.limit(5))

print("Vista previa de los datos de leads:")
display(df_leads.limit(5))

print("Vista previa de los datos de sales:")
display(df_sales.limit(5))

StatementMeta(, ba2c1858-6b31-40cc-9935-fe7a444e0cc4, 6, Finished, Available, Finished)

Vista previa de los datos de clientes:


SynapseWidget(Synapse.DataFrame, fbd71f44-41c5-47c5-9bf1-15d27e076e65)

Vista previa de los datos de brokers:


SynapseWidget(Synapse.DataFrame, bf0f098a-3093-4cd4-8a0e-ba3cb04228bf)

Vista previa de los datos de projects:


SynapseWidget(Synapse.DataFrame, bcf01b3d-7120-498a-9a0d-8990ec092361)

Vista previa de los datos de properties:


SynapseWidget(Synapse.DataFrame, 02eeb713-8f9b-41f3-8962-34403f3af209)

Vista previa de los datos de campaigns:


SynapseWidget(Synapse.DataFrame, 18321e12-6962-4c63-9eca-625861fe6e4b)

Vista previa de los datos de leads:


SynapseWidget(Synapse.DataFrame, 5f21ee27-bf09-48b6-9239-759ba633de2d)

Vista previa de los datos de sales:


SynapseWidget(Synapse.DataFrame, e4b37c95-c0d4-4171-8697-4aac87f64a39)

In [4]:
# Guardar los dataframes como tablas Delta
# Guardamos los dataframes procesados como tablas Delta en el lakehouse

try:
    # Guardar clientes
    df_clientes.write.format("delta").mode("overwrite").save("Files/Silver/clientes_silver")
    
    # Guardar brokers
    df_brokers.write.format("delta").mode("overwrite").save("Files/Silver/brokers_silver")
    
    # Guardar projects
    df_projects.write.format("delta").mode("overwrite").save("Files/Silver/projects_silver")
    
    # Guardar properties
    df_properties.write.format("delta").mode("overwrite").save("Files/Silver/properties_silver")
    
    # Guardar campaigns
    df_campaigns.write.format("delta").mode("overwrite").save("Files/Silver/campaigns_silver")

    # Guardar leads
    df_leads.write.format("delta").mode("overwrite").save("Files/Silver/leads_silver")
    
    # Guardar sales
    df_sales.write.format("delta").mode("overwrite").save("Files/Silver/sales_silver")
    
    print("✅ Dataframes guardados como archivos Delta correctamente")
except Exception as e:
    print(f"Error al guardar archivos Delta: {str(e)}")

StatementMeta(, 787d561e-e35a-4ce0-9b0e-18d414904977, 6, Finished, Available, Finished)

✅ Dataframes guardados como archivos Delta correctamente
