In [None]:
!az group create --name rg-adf-james --location eastus --subscription 33266a41-134d-4de7-a780-665b38b0f7b8

In [None]:
!az datafactory create \
    --resource-group rg-adf-james \
    --factory-name adf-james \
    --location eastus


In [None]:
!az datafactory list --resource-group rg-adf-james

In [None]:
# Create a storage account
!az storage account create \
    --resource-group rg-adf-james \
    --name jamesaccount2025 \
    --location eastus \
    --sku Standard_LRS

In [None]:
# Create a container
!az storage container create \
    --account-name jamesaccount2025 \
    --name inputcontainer



In [None]:
# Upload lab2data.csv
!az storage blob upload \
    --account-name jamesaccount2025 \
    --container-name inputcontainer \
    --name lab2data.csv \
    --file lab2data.csv

In [None]:
# Create Azure Database for PostgreSQL flexible server
!az postgres flexible-server create --subscription 33266a41-134d-4de7-a780-665b38b0f7b8 --resource-group rg-adf-james --name adfpostgres --location canadacentral --version 16 --zone 1 --password-auth enabled --admin-user adfadmin --admin-password adf@min2025 --tier GeneralPurpose --sku-name Standard_D2s_v3 --storage-type premium_lrs --storage-size 128 --performance-tier P10 --storage-auto-grow enabled --high-availability disabled --public-access all

In [None]:
!az postgres flexible-server firewall-rule create \
    --resource-group rg-adf-james \
    --name adfpostgres \
    --rule-name AllowAll \
    --start-ip-address 0.0.0.0 \
    --end-ip-address 255.255.255.255

In [None]:
# Install postgresql if you haven't already
!choco install -y postgresql

In [None]:
!psql "sslmode=require host=adfpostgres.postgres.database.azure.com user=adfadmin dbname=postgres" -c "CREATE TABLE public.sample_data (id SERIAL PRIMARY KEY, name VARCHAR(100), value INT);"

In [None]:
!psql "sslmode=require host=adfpostgres.postgres.database.azure.com user=adfadmin dbname=postgres" -c "INSERT INTO public.sample_data(name, value) VALUES ('Test1',123),('Test2',456);"

In [None]:
# Verify the data
!psql "sslmode=require host=adfpostgres.postgres.database.azure.com user=adfadmin password=adf@min2025 dbname=postgres" -c "SELECT * FROM public.sample_data"

In [None]:
!pip install azure-mgmt-datafactory azure-identity

In [34]:
from azure.identity import AzureCliCredential
from azure.mgmt.datafactory import DataFactoryManagementClient
from azure.mgmt.datafactory.models import (
    LinkedServiceResource,
    AzureStorageLinkedService,
    AzurePostgreSqlLinkedService,
    DatasetResource,
    DelimitedTextDataset,
    PostgreSqlTableDataset,
    CopyActivity,
    PipelineResource,
    DatasetStorageFormat,
    DatasetCompression,
    LinkedService,
    Dataset,
    Activity
    )
import json

In [38]:
# 1. Configure Credentials and Clients
credential = AzureCliCredential()

# Replace with your actual values
subscription_id = "33266a41-134d-4de7-a780-665b38b0f7b8"
resource_group = "rg-adf-james"
factory_name = "adf-james"
storage_account_name = "jamesaccount2025"
postgres_server_name= "adfpostgres.postgres.database.azure.com"
postgres_admin_user = "adfadmin"
postgres_admin_password = "adf@min2025"
storage_container_name = "inputcontainer"
storage_file_name = "lab2data.csv"
table_name = "public.sample_data"
location = "eastus"



df_client = DataFactoryManagementClient(credential, subscription_id)

In [None]:
# 2.1 Create Azure Storage Linked Service
storage_linked_service = LinkedServiceResource(
    properties=AzureStorageLinkedService(
    connection_string={
        "type": "SecureString",
        "value": f"DefaultEndpointsProtocol=https;AccountName={storage_account_name};EndpointSuffix=core.windows.net;"
    }
    ))
print("Creating storage linked service...")
ls_storage = df_client.linked_services.create_or_update(resource_group,factory_name, "AzureStorageLinkedService", storage_linked_service)

print(f"storage linked service created {ls_storage.name}")

In [None]:
# 2.2 Create Azure PostgreSQL Linked Service
postgres_linked_service = LinkedServiceResource(
    properties=AzurePostgreSqlLinkedService(
        server = postgres_server_name,
        database = "postgres",
        username = postgres_admin_user,
        password = {
            "type": "SecureString",
            "value": postgres_admin_password
        }
    ))
print("Creating postgres linked service...")
ls_postgres = df_client.linked_services.create_or_update(resource_group, factory_name, "AzurePostgreSqlLinkedService", postgres_linked_service)
print(f"postgres linked service created {ls_postgres.name}")

In [None]:
# 3.1 Create Azure Storage CSV Dataset
storage_dataset = DatasetResource(
        properties=DelimitedTextDataset(
        linked_service_name=LinkedService(
            reference_name="AzureStorageLinkedService",
            type="LinkedServiceReference",
        ),
         folderPath=storage_container_name,
        fileName=storage_file_name,
        compression= DatasetCompression(type="none"),
        location_type="AzureBlobStorageLocation",
        columnDelimiter = ",",
        rowDelimiter="\n",
        first_row_as_header = True
    ))
print("Creating CSV dataset...")
ds_csv = df_client.datasets.create_or_update(resource_group, factory_name, "AzureBlobCSVDataset", storage_dataset)
print(f"csv dataset created {ds_csv.name}")

In [None]:
# 3.2 Create Azure PostgreSQL Table Dataset
postgres_dataset = DatasetResource(
    properties=PostgreSqlTableDataset(
        linked_service_name=LinkedService(
            reference_name="AzurePostgreSqlLinkedService",
            type="LinkedServiceReference"
        ),
       table_name = table_name
    ))
print("Creating postgres dataset...")
ds_postgres = df_client.datasets.create_or_update(resource_group, factory_name, "AzurePostgreSqlDataset", postgres_dataset)
print(f"postgres dataset created {ds_postgres.name}")

In [None]:
from azure.mgmt.datafactory.models import CopyActivity, PipelineResource, DatasetReference, DelimitedTextSource, AzurePostgreSqlSink,BlobSource

# 4. Create Copy Activity

copy_activity = CopyActivity(
    name="CopyDataFromBlobToPostgres",
    inputs=[DatasetReference(reference_name="AzureBlobCSVDataset",type="DatasetReference")],
    outputs=[DatasetReference(reference_name="AzurePostgreSqlDataset",type="DatasetReference")],
    source=DelimitedTextSource(),
    sink=AzurePostgreSqlSink(write_batch_timeout="00:05:00", write_batch_size=10000)
)

# Create a pipeline with the copy activity
pipeline_name = "copyPipeline"
params_for_pipeline = {}
pipeline_obj = PipelineResource(activities=[copy_activity], parameters=params_for_pipeline)
pipeline = df_client.pipelines.create_or_update(resource_group, factory_name, pipeline_name, pipeline_obj)
print(f"Pipeline created: {pipeline.name}")


In [None]:
import json
import requests

# Replace with your details
pipeline_name = "copyPipeline"

token = !az account get-access-token --query accessToken --output tsv
headers = {
    "Authorization": f"Bearer {token[0]}",
    "Content-Type": "application/json"
}

url = f"https://management.azure.com/subscriptions/{subscription_id}/resourceGroups/{resource_group}/providers/Microsoft.DataFactory/factories/{factory_name}/pipelines/{pipeline_name}/createRun?api-version=2018-06-01"
resp = requests.post(url, headers=headers, json={})
print("Run pipeline response:", resp.json())

In [None]:
url_runs = f"https://management.azure.com/subscriptions/{subscription_id}/resourceGroups/{resource_group}/providers/Microsoft.DataFactory/factories/{factory_name}/queryPipelineRuns?api-version=2018-06-01"
payload = {"lastUpdatedAfter": "2023-01-01T00:00:00Z", "lastUpdatedBefore": "2024-01-01T00:00:00Z"}
runs_resp = requests.post(url_runs, headers=headers, json=payload)
print("Pipeline runs:", runs_resp.json())