#Sample Shock Analysis Based on a SAM Table

In [0]:
import numpy as np
import pandas as pd

Step 1. Define the SAM matrix.

In [0]:
# Adjusted 5x5 SAM data with zeros on the main diagonal
sam_matrix = np.array([
    [0, 50, 30, 60, 20],
    [40, 0, 70, 90, 30],
    [30, 50, 0, 80, 40],
    [60, 90, 80, 0, 60],
    [20, 30, 40, 60, 0]
])
# Create a Pandas DataFrame
columns = ['Agriculture', 'Manufacturing', 'Services', 'Households', 'Government']
sam_df = pd.DataFrame(sam_matrix, columns=columns, index=columns)
display(sam_df)

Agriculture,Manufacturing,Services,Households,Government
0,50,30,60,20
40,0,70,90,30
30,50,0,80,40
60,90,80,0,60
20,30,40,60,0


In [0]:
from pyspark.sql.types import IntegerType, StructType, StructField

# Column names
columns = ['Agriculture', 'Manufacturing', 'Services', 'Households', 'Government']

# Convert the NumPy array to a list of tuples and cast the values to int
sam_list = [tuple(int(value) for value in row) for row in sam_matrix]

# Create the schema for the Spark DataFrame
schema = StructType([StructField(column, IntegerType(), nullable=True) for column in columns])

# Create a Spark DataFrame directly from the list of tuples and the schema
sam_spark_df = spark.createDataFrame(sam_list, schema=schema)

# Display the Spark DataFrame
display(sam_spark_df)

Agriculture,Manufacturing,Services,Households,Government
0,50,30,60,20
40,0,70,90,30
30,50,0,80,40
60,90,80,0,60
20,30,40,60,0


Step 2. Convert the SAM matrix into a Databricks dataframe. Use the long format.

In [0]:
# Convert to long format (similar to Pandas melt)
from pyspark.sql.functions import col, lit, array, struct, explode

# Convert wide format to long format
long_format_df = sam_spark_df.select(
    array([lit(c).alias("Column") for c in columns]).alias("Column"),
    struct([col(c).alias(c) for c in columns]).alias("Values")
).select(explode(col("Column")).alias("Column"), col("Values.*")).withColumnRenamed("Column", "Row")

long_format_df = long_format_df.selectExpr(
    "Row",
    "stack(5, 'Agriculture', Agriculture, 'Manufacturing', Manufacturing, 'Services', Services, 'Households', Households, 'Government', Government) as (Column, Value)"
)

# Display the long format Spark DataFrame
display(long_format_df)

Row,Column,Value
Agriculture,Agriculture,0
Agriculture,Manufacturing,50
Agriculture,Services,30
Agriculture,Households,60
Agriculture,Government,20
Manufacturing,Agriculture,0
Manufacturing,Manufacturing,50
Manufacturing,Services,30
Manufacturing,Households,60
Manufacturing,Government,20


Step 3. Normalize and regularize the SAM: The SAM is normalized to get the technical coefficients matrix and then regularized to ensure numerical stability and handle potential singularities. 

In [0]:
# Normalize the SAM matrix by dividing each element by the sum of its column
column_sums = sam_matrix.sum(axis=0)
normalized_sam = sam_matrix / column_sums

# Regularization parameter
epsilon = 1e-10

# Add regularization to the diagonal - the np.eye() function generates an identity matrix
regularized_matrix = normalized_sam + np.eye(normalized_sam.shape[0]) * epsilon

Step 4. Compute the Leontief Inverse: The Leontief inverse is computed with regularization to handle potential singularity.

In [0]:
# Compute the inverse of the normalized SAM matrix
try:
    inverse_sam = np.linalg.inv(regularized_matrix)
except np.linalg.LinAlgError as e:
    # The matrix is not invertible. Compute the Moore-Penrose pseudoinverse instead. 
    inverse_sam = np.linalg.pinv(regularized_matrix)

Step 5. Perform Shock Analysis: A 10% increase in final demand for the Agriculture sector is introduced, and the impact on all sectors is calculated using the Leontief inverse.

In [0]:
# Perform Shock Analysis: 10% increase in final demand for Agriculture
shock_vector = np.array([0.10, 0, 0, 0, 0])

# Calculate the impact of the shock
impact_vector = inverse_sam @ shock_vector
print(impact_vector)

[-0.52979651  0.09912791  0.22063954  0.08219477  0.2278343 ]


Convert the impact vector to a dataframe for display.

In [0]:
# Convert the impact_vector to a Spark DataFrame
sectors = ['Agriculture', 'Manufacturing', 'Services', 'Households', 'Government']
impact_df = spark.createDataFrame([(sectors[i], float(impact_vector[i])) for i in range(len(sectors))], ["Sector", "Impact"])

# Display the Spark DataFrame
display(impact_df)

Sector,Impact
Agriculture,-0.5297965120309741
Manufacturing,0.0991279069812021
Services,0.2206395350563949
Households,0.0821947674368128
Government,0.2278343025465641


Databricks visualization. Run in Databricks to view.

Step 1. Create the SAM DataFrame: The SAM is created with zeros on the main diagonal and loaded into a Pandas dataframe. The Pandas dataframe format is the easiest format to represent the SAM matrix as a table.


In [0]:
import pandas as pd

# Adjusted 5x5 SAM data with zeros on the main diagonal
data = np.array([
    [0, 50, 30, 60, 20],
    [40, 0, 70, 90, 30],
    [30, 50, 0, 80, 40],
    [60, 90, 80, 0, 60],
    [20, 30, 40, 60, 0]
])
# Create a Pandas DataFrame
columns = ['Agriculture', 'Manufacturing', 'Services', 'Households', 'Government']
sam_df = pd.DataFrame(data, columns=columns, index=columns)
display(sam_df)

Agriculture,Manufacturing,Services,Households,Government
0,50,30,60,20
40,0,70,90,30
30,50,0,80,40
60,90,80,0,60
20,30,40,60,0


Step 2. Convert the SAM matrix into a Databricks dataframe.

In [0]:
from pyspark.sql.types import IntegerType, StructType, StructField

# Column names
columns = ['Agriculture', 'Manufacturing', 'Services', 'Households', 'Government']

# Convert the NumPy array to a list of tuples and cast the values to int
sam_list = [tuple(int(value) for value in row) for row in data]

# Create the schema for the Spark DataFrame
schema = StructType([StructField(column, IntegerType(), nullable=True) for column in columns])

# Create a Spark DataFrame directly from the list of tuples and the schema
sam_spark_df = spark.createDataFrame(sam_list, schema=schema)

# Display the Spark DataFrame
display(sam_spark_df)

Agriculture,Manufacturing,Services,Households,Government
0,50,30,60,20
40,0,70,90,30
30,50,0,80,40
60,90,80,0,60
20,30,40,60,0


Convert to Long Format: The Spark DataFrame is transformed to a long format using Spark SQL functions. This involves selecting columns, creating an array of column names, exploding the array to create rows, and stacking the values.

The long format offers several advantages in data analysis, manipulation, and visualization. It is more flexible, easier to work with, and compatible with many data processing and visualization tools

In [0]:
# Convert to long format (similar to Pandas melt)
from pyspark.sql.functions import col, lit, array, struct, explode

# Convert wide format to long format
long_format_df = sam_spark_df.select(
    array([lit(c).alias("Column") for c in columns]).alias("Column"),
    struct([col(c).alias(c) for c in columns]).alias("Values")
).select(explode(col("Column")).alias("Column"), col("Values.*")).withColumnRenamed("Column", "Row")

long_format_df = long_format_df.selectExpr(
    "Row",
    "stack(5, 'Agriculture', Agriculture, 'Manufacturing', Manufacturing, 'Services', Services, 'Households', Households, 'Government', Government) as (Column, Value)"
)

# Display the long format Spark DataFrame
display(long_format_df)

Row,Column,Value
Agriculture,Agriculture,0
Agriculture,Manufacturing,50
Agriculture,Services,30
Agriculture,Households,60
Agriculture,Government,20
Manufacturing,Agriculture,0
Manufacturing,Manufacturing,50
Manufacturing,Services,30
Manufacturing,Households,60
Manufacturing,Government,20


Step 3. Normalize the SAM: The SAM is normalized to get the technical coefficients matrix (A).

In [0]:
# Normalize the SAM to obtain technical coefficients matrix (A)
# Convert to numpy array for normalization
sam_array = data

# Normalize by column sums to get technical coefficients matrix (A)
column_sums = sam_array.sum(axis=0)
A = sam_array / column_sums

Step 4. Convert Normalized SAM to Spark DataFrame: The normalized SAM is converted back to a Spark DataFrame and displayed.

In [0]:
# Convert back to Spark DataFrame -- with a temporary conversion into pandas dataframe
A_df = spark.createDataFrame(pd.DataFrame(A, columns=columns, index=columns).reset_index().melt(id_vars=['index'], var_name='Column', value_name='Value'))
A_df = A_df.withColumnRenamed('index', 'Row')
display(A_df)

Row,Column,Value
Agriculture,Agriculture,0.0
Manufacturing,Agriculture,0.2666666666666666
Services,Agriculture,0.2
Households,Agriculture,0.4
Government,Agriculture,0.1333333333333333
Agriculture,Manufacturing,0.2272727272727272
Manufacturing,Manufacturing,0.0
Services,Manufacturing,0.2272727272727272
Households,Manufacturing,0.4090909090909091
Government,Manufacturing,0.1363636363636363


Step 5. Compute the Leontief Inverse: The Leontief inverse is computed with regularization to handle potential singularity.

In [0]:
# Compute the Leontief inverse
regularized_matrix = A + np.eye(normalized_sam.shape[0]) * epsilon
# np.eye creates an identity matrix of the same size as A (using normalized_sam as the reference for dimensions) and multiplies it by ϵ.

# compute the inverse of the square matrix
try:
    leontief_inverse = np.linalg.inv(regularized_matrix)
except np.linalg.LinAlgError:
    leontief_inverse = np.linalg.pinv(regularized_matrix)


Step 6. Convert Leontief Inverse to Spark DataFrame: The Leontief inverse is converted to a Spark DataFrame and displayed.

In [0]:
leontief_inverse_df = spark.createDataFrame(pd.DataFrame(leontief_inverse, columns=columns, index=columns).reset_index().melt(id_vars=['index'], var_name='Column', value_name='Value'))
leontief_inverse_df = leontief_inverse_df.withColumnRenamed('index', 'Row')
display(leontief_inverse_df)

Row,Column,Value
Agriculture,Agriculture,-5.29796512030974
Manufacturing,Agriculture,0.9912790698120216
Services,Agriculture,2.2063953505639486
Households,Agriculture,0.8219476743681284
Government,Agriculture,2.278343025465641
Agriculture,Manufacturing,-0.5014534898095933
Manufacturing,Manufacturing,-2.8032945750197493
Services,Manufacturing,1.5668604661655774
Households,Manufacturing,0.7938468991965968
Government,Manufacturing,1.944040699367168


Step 7. Calculate Multipliers: The multipliers are calculated by summing the columns of the Leontief inverse, converted to Dataframe and displayed.

In [0]:
# Calculate multipliers
multipliers = leontief_inverse.sum(axis=0)
multipliers_df = spark.createDataFrame(pd.DataFrame(multipliers, columns=["Multiplier"], index=columns).reset_index().rename(columns={'index': 'Sector'}))
display(multipliers_df)

Sector,Multiplier
Agriculture,0.9999999998999992
Manufacturing,0.9999999998999995
Services,0.9999999999
Households,0.9999999999
Government,0.9999999999


Step 8. Perform Shock Analysis: A 10% increase in final demand for the Agriculture sector is introduced, and the impact on all sectors is calculated using the Leontief inverse.

In [0]:
# Perform Shock Analysis: 10% increase in final demand for Agriculture
shock_vector = np.zeros(5)
shock_vector[0] = 0.10  # 10% increase in Agriculture

# Calculate the impact of the shock
impact_vector = leontief_inverse @ shock_vector
impact_df = spark.createDataFrame(pd.DataFrame(impact_vector, columns=["Impact"], index=columns).reset_index().rename(columns={'index': 'Sector'}))
display(impact_df)


Sector,Impact
Agriculture,-0.5297965120309741
Manufacturing,0.0991279069812021
Services,0.2206395350563949
Households,0.0821947674368128
Government,0.2278343025465641


Databricks visualization. Run in Databricks to view.

## Incorporate the impact analysis results back into the initial Social Accounting Matrix (SAM) using the RAS method.

In [0]:
"""
Reciprocal Alteration of Selection

The RAS method iteratively adjusts the rows and columns of the original SAM to match the new target sums, ensuring the matrix remains balanced.

Adjusts the rows and columns of the matrix to match the target sums iteratively.

The adjustment is done by scaling the rows and columns using the factors R and S.
"""

def ras_method(A, target_row_sums, target_col_sums, max_iter=100, tol=1e-5):
    """
    Perform the RAS method to update the SAM matrix to match target row and column sums.
    """
    n = A.shape[0]
    R = np.ones(n)
    S = np.ones(n)
    for iteration in range(max_iter):
        # Adjust rows
        for i in range(n):
            R[i] = target_row_sums[i] / np.sum(A[i, :] * S)
        # Adjust columns
        for j in range(n):
            S[j] = target_col_sums[j] / np.sum(A[:, j] * R)
        # Update A
        A_new = (A.T * R).T * S
        # Check convergence
        if np.allclose(np.sum(A_new, axis=1), target_row_sums, atol=tol) and np.allclose(np.sum(A_new, axis=0), target_col_sums, atol=tol):
            break
        A = A_new
    return A

# Calculate the target row and column sums after impact analysis
original_row_sums = sam_matrix.sum(axis=1)
original_col_sums = sam_matrix.sum(axis=0)
target_row_sums = original_row_sums + impact_vector
target_col_sums = original_col_sums + impact_vector

# Apply the RAS method to update the SAM matrix
updated_sam = ras_method(sam_matrix, target_row_sums, target_col_sums)

print("Updated SAM matrix:")
print(updated_sam)

Updated SAM matrix:
[[ 0.         49.83666536 29.90836929 59.7776709  19.94963921]
 [39.85196587  0.         70.10609346 90.07753573 30.0616319 ]
 [29.90617712 50.09393077  0.         80.11500463 40.10524532]
 [59.76607149 90.09930263 80.10533033  0.         60.11131787]
 [19.945989   30.06922915 40.10084646 60.11198351  0.        ]]


Create a dataframe of the updated SAM matrix into for pretty display.

In [0]:
from pyspark.sql.types import StructType, StructField, FloatType

# Convert the updated_sam matrix to a list of tuples
updated_sam_list = [tuple(map(float, row)) for row in updated_sam]
print(updated_sam_list)

# Define column names
columns = ['Agriculture', 'Manufacturing', 'Services', 'Households', 'Government']

# Define schema 
schema = StructType([StructField(col, FloatType(), True) for col in columns])

# Create a Spark DataFrame
updated_sam_df = spark.createDataFrame(updated_sam_list, schema=schema)

# Display the Spark DataFrame
display(updated_sam_df)

[(0.0, 49.836665364406734, 29.908369288857532, 59.777670896138055, 19.949639205890094), (39.851965872055686, 0.0, 70.10609345902074, 90.07753572537099, 30.061631899293744), (29.90617712361375, 50.09393077023064, 0.0, 80.11500463446434, 40.10524532344316), (59.76607149424284, 90.09930262633088, 80.1053303318681, 0.0, 60.1113178739196), (19.94598899805673, 30.069229146012944, 40.100846455310005, 60.111983511463464, 0.0)]


Agriculture,Manufacturing,Services,Households,Government
0.0,49.836666,29.90837,59.77767,19.949638
39.851967,0.0,70.106094,90.07754,30.061632
29.906178,50.09393,0.0,80.115005,40.105244
59.76607,90.099304,80.10533,0.0,60.111317
19.94599,30.06923,40.100845,60.111984,0.0


## Perform the same impact analysis calculation in R

In [0]:
%r

# Define the SAM matrix
sam_matrix <- matrix(c(0, 50, 30, 60, 20,
                       40, 0, 70, 90, 30,
                       30, 50, 0, 80, 40,
                       60, 90, 80, 0, 60,
                       20, 30, 40, 60, 0), 
                     nrow = 5, byrow = TRUE)

colnames(sam_matrix) <- c('Agriculture', 'Manufacturing', 'Services', 'Households', 'Government')
rownames(sam_matrix) <- c('Agriculture', 'Manufacturing', 'Services', 'Households', 'Government')

# Normalize the SAM matrix by dividing each element by the sum of its column
column_sums <- colSums(sam_matrix)
normalized_sam <- sam_matrix / column_sums[col(sam_matrix)]

# Regularization parameter
epsilon <- 1e-10

# Add regularization to the diagonal
regularized_matrix <- normalized_sam + diag(epsilon, nrow(normalized_sam), ncol(normalized_sam))

# Function to compute the inverse and handle non-invertible matrices
invert_matrix <- function(matrix) {
  tryCatch({
    inv_matrix <- solve(matrix)
    return(inv_matrix)
  }, error = function(e) {
    cat("Matrix is not invertible:", e$message, "\n")
    return(NULL)
  })
}

# Compute the inverse of the regularized SAM matrix
inverse_sam <- invert_matrix(regularized_matrix)

if (!is.null(inverse_sam)) {
  cat("Normalized SAM matrix:\n")
  print(normalized_sam)
  cat("\nRegularized SAM matrix:\n")
  print(regularized_matrix)
  cat("\nInverse matrix of the regularized SAM:\n")
  print(inverse_sam)
  
  # Perform Shock Analysis: 10% increase in final demand for Agriculture
  shock_vector <- c(0.10, 0, 0, 0, 0)
  
  # Calculate the impact of the shock
  impact_vector <- inverse_sam %*% shock_vector
  
  # Convert the impact_vector to a Spark DataFrame
  sectors <- c('Agriculture', 'Manufacturing', 'Services', 'Households', 'Government')
  impact_df <- data.frame(Sector = sectors, Impact = as.numeric(impact_vector))
#  impact_df <- as.DataFrame(data.frame(Sector = sectors, Impact = as.numeric(impact_vector)))
  
  # Display the Spark DataFrame
  display(impact_df)
}



Sector,Impact
Agriculture,-0.5297965120309741
Manufacturing,0.0991279069812021
Services,0.2206395350563949
Households,0.0821947674368128
Government,0.2278343025465641


Databricks visualization. Run in Databricks to view.

Normalized SAM matrix:
              Agriculture Manufacturing  Services Households Government
Agriculture     0.0000000     0.2272727 0.1363636  0.2068966  0.1333333
Manufacturing   0.2666667     0.0000000 0.3181818  0.3103448  0.2000000
Services        0.2000000     0.2272727 0.0000000  0.2758621  0.2666667
Households      0.4000000     0.4090909 0.3636364  0.0000000  0.4000000
Government      0.1333333     0.1363636 0.1818182  0.2068966  0.0000000

Regularized SAM matrix:
               Agriculture Manufacturing     Services   Households   Government
Agriculture   0.0000000001  0.2272727273 0.1363636364 0.2068965517 0.1333333333
Manufacturing 0.2666666667  0.0000000001 0.3181818182 0.3103448276 0.2000000000
Services      0.2000000000  0.2272727273 0.0000000001 0.2758620690 0.2666666667
Households    0.4000000000  0.4090909091 0.3636363636 0.0000000001 0.4000000000
Government    0.1333333333  0.1363636364 0.1818181818 0.2068965517 0.0000000001

Inverse matrix of the regularized SAM:
