In [76]:
import pyspark as ps
import pyspark.sql.functions as f
import pyspark.sql.types as sparktypes
from pyspark.sql.functions import udf, col

In [2]:
spark = (ps.sql.SparkSession.builder 
    .master("local") 
    .appName("pipeline")
    .getOrCreate()
    )
sc = spark.sparkContext
sc.setLogLevel("WARN")

In [3]:
## loading responses with survey data into a spark dataframe
path = "../data/SharedResponsesSurvey_10000.csv"
responses = spark.read.csv(path, header=True)

## pullin out all the countries (n < 100)
path = "../data/country_cluster_map.csv"
countries = spark.read.csv(path, header=True).select("ISO3")

In [41]:
def p_intervention(dataf):
    '''
    Returns the proportion of choices in dataf that favored
    intervention over non-intervention, and n the number of choices analyzed.
        Params: dataf (Spark Dataframe)
        Returns: p (float), n (int)
    '''
    # probability of having chosen commission
    commits = dataf.filter("Saved = 1 AND Intervention = 1").count()
    # probability of having not chosen omission, meaning that the user must have chosen
    # commission in the scenario
    omits = dataf.filter("Saved = 0 AND Intervention = 0").count()
    n = dataf.count()
    try:
        return (round((commits + omits) / n, 4), n)
    except ZeroDivisionError:
        print("p_intervention received a dataframe without revelant entries.")
        return None

In [43]:
def p_legality(dataf):
    '''
    Returns p the proportion of choices from data in dataf that favored saving pedestrians
    crossing legally, and n the number of choices with the legal dimension.
        Params: dataf (Spark Dataframe)
        Returns: tuple: (p (float), n (int))
    '''
    legality = dataf.filter("CrossingSignal != 0 AND PedPed = 1")
    ## above line credit Edmond Awad, MMFunctionsShared.R
    ## found at: https://osf.io/3hvt2/files/
    n = legality.count()
    
    # probability of having chosen to save law-abiding
    peds = legality.filter("Saved = 1 AND CrossingSignal = 1").count()
    # probability of having chosen to not save non-law-abiding
    jwalkers = legality.filter("Saved = 0 AND CrossingSignal = 2").count()
    
    try:
        return (round((peds + jwalkers) / n, 4), n)
    except ZeroDivisionError:
        print("p_legality received a dataframe without revelant entries.")
        return None    

In [44]:
def p_factor(dataf, attribute):
    '''
       Returns the proportion of choices from data in dataf that favored the default choice, 
    default (str) the default choice for the factor, nondefault (str) the alternative choice
    for the factor, and n (int) the number of choices analyzed with the factor corresponding
    to the dimension.
        Parameters: dataf (Spark Dataframe), attribute (str)
        Returns: tuple: p (float), n (int), default (str), nondefault (str),
    '''
    attr = {"Utilitarian" : ['More', 'Less']\
              , "Gender" : ['Male', 'Female']\
              , "Social Status" : ['High', 'Low']\
              , "Age" : ['Young', 'Old']\
             , "Species" : []\
             , "Fitness" : []}
    try:
        default, nondefault = attr[attribute]
    except KeyError:
        print("p_factor received an invalid attribute.")
        return None  
    
    factor = dataf.filter(f"ScenarioType = '{attribute}' ")
    n = factor.count()
    # probability of having chosen the default
    defs = factor.filter(f"Saved = 1 AND AttributeLevel = '{default}'").count()
    # probability of having not chosen the nondefault
    nonnondefs = factor.filter(f"Saved = 0 AND AttributeLevel = '{nondefault}'").count()
    try:
        return ( round((defs + nonnondefs) / n, 4), n, default, nondefault )
    except ZeroDivisionError:
        print("p_factor received a dataframe without revelant entries.")
        return None   

In [13]:
import pandas as pd

In [81]:
country_probs = responses.select("UserCountry3").groupby("UserCountry3").count()
country_probs.select("UserCountry3").take(5)

[Row(UserCountry3='POL'),
 Row(UserCountry3='LVA'),
 Row(UserCountry3='BRA'),
 Row(UserCountry3='FRA'),
 Row(UserCountry3='ITA')]

In [None]:
country_probs = responses.select("UserCountry3").groupby("UserCountry3")

int_p = udf(lambda frame: p_intervention(frame)[0], sparktypes.FloatType())
country_probs = country_probs.apply(int_p)

In [87]:
## wanting to do it all in spark ... might be too hard
pandas_cols = ["ISO3", "p_intervention", "n_intervention", "p_legality", "n_legality",\
               "p_util", "n_util", "p_gender", "n_gender", \
               "p_social", "n_social", "p_age", "n_age"]
country_probs = responses.select("UserCountry3").groupby("UserCountry3").count()
#country_probs = pd.DataFrame(columns=pandas_cols)

country_probs = responses.select('*').groupby("UserCountry3").agg({})

for row in country_probs.select("UserCountry3").take(2):
    country = row.UserCountry3
    country_responses = responses.filter(f"UserCountry3 = '{country}' ")
    
    factors = ["Utilitarian", "Gender", "Social Status", "Age"]
    
    #intervention
    int_p = udf(lambda frame: p_intervention(frame)[0], sparktypes.FloatType())
    int_n = udf(lambda frame: p_intervention(frame)[1], sparktypes.IntegerType())
    country_probs = country_probs.withColumn('p_intervention', int_p(country_responses))
    country_probs = country_probs.withColumn('n_intervention', int_n(country_responses))
#     try:
#         #legality
#         #country_data_out.extend(p_legality(country_responses))
        
# #         for fac in factors:
# #             p, n, deflt, nondeflt = p_factor(country_responses, fac)
# #             country_data_out.extend((p, n))
#     except TypeError:
#         print(f"{country} had no relevant entries.")
#         continue
    
    country_probs.show()
#     country_data_out_df = pd.DataFrame([country_data_out], columns=pandas_cols)
#     country_probs = country_probs.append(country_data_out_df)


TypeError: Invalid argument, not a string or column: DataFrame[ResponseID: string, ExtendedSessionID: string, UserID: string, ScenarioOrder: string, Intervention: string, PedPed: string, Barrier: string, CrossingSignal: string, AttributeLevel: string, ScenarioTypeStrict: string, ScenarioType: string, DefaultChoice: string, NonDefaultChoice: string, DefaultChoiceIsOmission: string, NumberOfCharacters: string, DiffNumberOFCharacters: string, Saved: string, Template: string, DescriptionShown: string, LeftHand: string, UserCountry3: string, Review_age: string, Review_education: string, Review_gender: string, Review_income: string, Review_political: string, Review_religious: string] of type <class 'pyspark.sql.dataframe.DataFrame'>. For column literals, use 'lit', 'array', 'struct' or 'create_map' function.

In [70]:
country_pref_types = {"ISO3" : "str", "p_intervention" : "float64", "n_intervention" : "int64",\
                    "p_legality" : "float64", "n_legality" : "int64",\
                    "p_util" : "float64", "n_util" : "int64",\
                    "p_gender" : "float64", "n_gender" : "int64", \
                   "p_social" : "float64", "n_social" : "int64",\
                      "p_age" : "float64", "n_age" : "int64"}

data = pd.read_csv("../data/country_preferences.csv", dtype=country_pref_types)
data.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 58 entries, 0 to 57
Data columns (total 8 columns):
Unnamed: 0          58 non-null int64
ISO3                58 non-null object
p_n_intervention    58 non-null object
p_n_legality        56 non-null object
p_n_util            58 non-null object
p_n_gender          58 non-null object
p_n_social          58 non-null object
p_n_age             58 non-null object
dtypes: int64(1), object(7)
memory usage: 3.8+ KB
('(', '0', '.', '3', '8', '8', '9', ',', ' ', '3', '6', ')')
