### Practica 2 

Eres el arquitecto de datos principal en una empresa global de retail que está migrando su plataforma de análisis de datos a Azure. El proyecto implica procesar terabytes de datos de ventas y clientes, implementar modelos de ML para predicción de demanda y crear un pipeline de datos en tiempo real para análisis de inventario.

Estructura del proyecto

In [None]:
azure_retail_analytics/
│
├── data/
│   ├── raw/
│   ├── processed/
│   └── models/
│
├── src/
│   ├── etl/
│   │   ├── extract_from_blob.py
│   │   ├── transform_with_databricks.py
│   │   └── load_to_synapse.py
│   │
│   ├── ml/
│   │   ├── train_demand_forecast.py
│   │   └── deploy_to_aks.py
│   │
│   └── stream/
│       └── inventory_analysis.py
│
├── iac/
│   ├── main.tf
│   └── variables.tf
│
├── adf/
│   └── pipelines/
│       ├── daily_sales_etl.json
│       └── ml_model_training.json
│
├── notebooks/
│   └── exploratory_analysis.ipynb
│
├── tests/
│   ├── test_etl.py
│   └── test_ml.py
│
├── .gitignore
├── requirements.txt
└── README.md

1. Inicia un nuevo repositorio:

2. Configura tu nombre de usuario y email para este repositorio

3. Crea un archivo .gitignore con el siguiente contenido:

In [None]:
# Python
__pycache__/
*.py[cod]

# Notebooks
.ipynb_checkpoints

# Azure
.azure

# Terraform
.terraform
*.tfstate
*.tfstate.*

# Credenciales
*.key
*.pem

Explicar el contenido del gitignore.

4. Añade y realiza el commit inicial:

#### Desarrollo de las ramas principales

Trabajaremos con 4 ramas principales: etl, ml, iac y stream.

#### Rama ETL

1. Crear y cambiarse a la rama etl

2. Crea los archivos ETL en  src/etl/:

`extract_from_blob.py`:

In [None]:
from azure.storage.blob import BlobServiceClient

def extract_data(container_name, blob_name):
    connection_string = "your_connection_string"
    blob_service_client = BlobServiceClient.from_connection_string(connection_string)
    blob_client = blob_service_client.get_blob_client(container=container_name, blob=blob_name)
    
    return blob_client.download_blob().readall()

`transform_with_databricks.py`:

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

def transform_data(df):
    spark = SparkSession.builder.appName("TransformSalesData").getOrCreate()
    
    transformed_df = df.withColumn("total_amount", col("quantity") * col("unit_price")) \
                      .groupBy("date", "product_id") \
                      .agg(sum("total_amount").alias("daily_sales"))
    
    return transformed_df

`load_to_synapse.py`:

In [None]:
from azure.synapse.artifacts.models import SqlScript

def load_data(workspace_name, sql_pool_name, table_name, data):
    synapse_client = SqlScript(workspace_name)
    
	    insert_query = f"INSERT INTO {table_name} VALUES {','.join(map(str, data))}"
    synapse_client.execute_sql_script(sql_pool_name, insert_query)

3. Añade y realiza commits de estos archivos.

#### Rama ML

1. Cambiarse a la rama principal y luego crear la rama `ml` :

2. Crea los archivos ML en `src/ml/` :

`train_demand_forecast.py`:

In [None]:
from azureml.core import Workspace, Experiment, ScriptRunConfig

def train_model():
    ws = Workspace.from_config()
    experiment = Experiment(workspace=ws, name="demand-forecast")
    config = ScriptRunConfig(source_directory=".", script="train.py", compute_target="cpu-cluster")
    
    run = experiment.submit(config)
    run.wait_for_completion(show_output=True)

`deploy_to_aks.py`:

In [None]:
from azureml.core import Model
from azureml.core.webservice import AksWebservice, Webservice

def deploy_model(model_name, service_name):
    ws = Workspace.from_config()
    model = Model(ws, name=model_name)
    
    aks_config = AksWebservice.deploy_configuration()
    aks_service = Model.deploy(ws, service_name, [model], aks_config)
    aks_service.wait_for_deployment(show_output=True)

3. Añadir y confirmar los archivos anteriores.

#### Rama Stream

1. Cambiarse a la rama principal y crear la rama `stream` :

`inventory_analysis.py`:

In [None]:
from azure.eventhub import EventHubConsumerClient
from azure.eventhub.aio import EventHubConsumerClient
import asyncio

async def process_event(partition_context, event):
    print("Received event from partition: {}".format(partition_context.partition_id))
    print("Event data: {}".format(event.body_as_str()))
    await partition_context.update_checkpoint(event)

async def main():
    client = EventHubConsumerClient.from_connection_string(
        conn_str="<connection_string>",
        consumer_group="$Default",
        eventhub_name="<eventhub_name>"
    )
    async with client:
        await client.receive(on_event=process_event, starting_position="-1")

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

3. Añadir y confirmar cambios de este archivo

#### Rama IaC

1. Cambia a la rama principal y crear la rama `iac`:

2. Crea los archivos de Terraform en `iac/`:

`main.tf`:

In [None]:
provider "azurerm" {
  features {}
}

resource "azurerm_resource_group" "rg" {
  name     = var.resource_group_name
  location = var.location
}

resource "azurerm_storage_account" "storage" {
  name                     = var.storage_account_name
  resource_group_name      = azurerm_resource_group.rg.name
  location                 = azurerm_resource_group.rg.location
  account_tier             = "Standard"
  account_replication_type = "LRS"
}

resource "azurerm_synapse_workspace" "synapse" {
  name                                 = var.synapse_workspace_name
  resource_group_name                  = azurerm_resource_group.rg.name
  location                             = azurerm_resource_group.rg.location
  storage_data_lake_gen2_filesystem_id = azurerm_storage_data_lake_gen2_filesystem.datalake.id
  sql_administrator_login              = var.synapse_sql_admin
  sql_administrator_login_password     = var.synapse_sql_admin_password
}

`variables.tf`:

In [None]:
variable "resource_group_name" {
  type = string
}

variable "location" {
  type = string
}

variable "storage_account_name" {
  type = string
}

variable "synapse_workspace_name" {
  type = string
}

variable "synapse_sql_admin" {
  type = string
}

variable "synapse_sql_admin_password" {
  type = string
}

3. Añadir y confirmar los cambios.

#### Fusión de ramas

1. Vuelve a la rama principal

2. Fusiona las ramas en la rama principal:

3. Elimina las ramas una vez fusionadas:

### Mas preguntas:

1. ¿Qué comando usarías para ver el historial de commits de forma concisa?

2. Si cometiste un error en el último mensaje de commit, ¿cómo lo corregirías sin crear un nuevo commit?

3. ¿Cómo deshaces los cambios en un archivo que aún no ha sido añadido al área de staging?

4. Si añadiste un archivo al área de staging por error, ¿cómo lo removerías sin perder los cambios en el archivo?

5. ¿Qué comando usarías para deshacer el último commit manteniendo los cambios en tu directorio de trabajo?

6. ¿Cómo verías el historial completo de acciones realizadas en el repositorio, incluyendo commits revertidos o borrados?

7. Si eliminaste un archivo por error y ya hiciste commit, ¿cómo lo recuperarías?

8. Cómo moverías un archivo a una nueva ubicación en el repositorio y registrarías este cambio en Git?

9. Si quieres eliminar un archivo del repositorio pero mantenerlo en tu sistema de archivos local, ¿qué comando usarías?

10. ¿Cómo fusionarías la rama 'feature' en la rama 'main', asumiendo que no hay conflictos?