In [15]:
#specify python install location above /home/ec2-user/SageMaker so that we don't have to 
#reinstall custom packages after every reboot.

import subprocess, sys, os, site

user_libs_path=os.path.expanduser("~")+"/SageMaker/.local"

if not os.path.exists(user_libs_path):
    os.makedirs(user_libs_path)

sys.path.insert(0, user_libs_path+'/lib/python3.6/site-packages')
site.USER_BASE=user_libs_path

my_env = os.environ.copy()
my_env["PYTHONUSERBASE"] = user_libs_path

process = subprocess.run("pip install -U --quiet sagemaker_pyspark"
                           .split(), env=my_env, stdout=subprocess.PIPE)
process.stderr




In [22]:
sys.path

['/home/ec2-user/SageMaker/.local/lib/python3.6/site-packages',
 '~/SageMaker/.local/lib/python3.6/site-packages',
 '~/SageMaker/.local/lib/python3.6/site-packages',
 '/tmp/spark-eeedd1bc-418e-40fe-a616-f4e8df631026/userFiles-4c5701b9-6ac2-41f1-bd51-e65f6ec0bdc3',
 '',
 '/home/ec2-user/anaconda3/envs/python3/lib/python36.zip',
 '/home/ec2-user/anaconda3/envs/python3/lib/python3.6',
 '/home/ec2-user/anaconda3/envs/python3/lib/python3.6/lib-dynload',
 '/home/ec2-user/anaconda3/envs/python3/lib/python3.6/site-packages',
 '/home/ec2-user/anaconda3/envs/python3/lib/python3.6/site-packages/IPython/extensions',
 '/home/ec2-user/.ipython']

In [19]:
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql.functions import sha2, concat_ws
import sys


In [3]:

def main():
    spark = SparkSession.builder.appName("Anonymize PySpark").getOrCreate()
    args_iter = iter(sys.argv[1:])
    args = dict(zip(args_iter, args_iter))
    
    #sample args for interactive testing
    #args = {'project_bucket': 'project1-lz', 'input_table': 'upload', 'output_table': 'raw', 'database': 'default', 'file_name': 'Tweets.csv'}

    project_bucket = args['project_bucket']
    input_table = args['input_table']
    output_table = args['output_table']
    database = args['database']
    input_s3_uri = 's3://' + project_bucket + '/' + input_table + '/' + args['file_name']
    output_s3_uri = 's3://' + project_bucket + '/' + output_table + '/' + args['file_name'].split('.')[0] + '-anon/'

    # Interactive pyspark from glue development endpoint allows reaading from glue crawlers
    # from awsglue.context import GlueContext
    #glueContext = GlueContext(SparkContext.getOrCreate())

    # Create a dataframe from glue catalog
    #df = glueContext.create_data_frame.from_catalog(database=database, table_name=input_table)

    #Print out information about this data
    #print("Count:  ", df.count())
    #df.printSchema()

    df = spark.read.csv(input_s3_uri, header=True)

    # replace each tweeters name with crc bigint
    dfAnnocrc = df.withColumn("annonym", sha2("name", 256)).select("annonym", "tweet_id", "airline", "airline_sentiment", "text")

    # write back to s3 as parquet
    dfAnnocrc.write.mode("append").parquet(output_s3_uri)


In [64]:
import boto3
gl_client = boto3.client('glue')
response = gl_client.get_table(DatabaseName='default', Name='upload')
response

{'Table': {'Name': 'upload',
  'DatabaseName': 'default',
  'Owner': 'owner',
  'CreateTime': datetime.datetime(2020, 11, 9, 15, 22, 5, tzinfo=tzlocal()),
  'UpdateTime': datetime.datetime(2020, 11, 9, 15, 22, 5, tzinfo=tzlocal()),
  'LastAccessTime': datetime.datetime(2020, 11, 9, 15, 22, 5, tzinfo=tzlocal()),
  'Retention': 0,
  'StorageDescriptor': {'Columns': [{'Name': 'tweet_id', 'Type': 'bigint'},
    {'Name': 'airline_sentiment', 'Type': 'string'},
    {'Name': 'airline_sentiment_confidence', 'Type': 'double'},
    {'Name': 'negativereason', 'Type': 'string'},
    {'Name': 'negativereason_confidence', 'Type': 'double'},
    {'Name': 'airline', 'Type': 'string'},
    {'Name': 'airline_sentiment_gold', 'Type': 'string'},
    {'Name': 'name', 'Type': 'string'},
    {'Name': 'negativereason_gold', 'Type': 'string'},
    {'Name': 'retweet_count', 'Type': 'bigint'},
    {'Name': 'text', 'Type': 'string'},
    {'Name': 'tweet_coord', 'Type': 'array<double>'},
    {'Name': 'tweet_create

In [None]:


if __name__ == "__main__":
    #main()




In [65]:
sys.path.append("./aws-glue-libs")
from awsglue.context import GlueContext
from awsglue.context import GlueContext

In [74]:
import sagemaker_pyspark
from pyspark.sql import SparkSession

classpath = ":".join(sagemaker_pyspark.classpath_jars())

spark = SparkSession.builder \
    .config("spark.driver.extraClassPath", classpath) \
    .config("hive.metastore.client.factory.class", "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory") \
    .config("hive.metastore.schema.verification", "false") \
    .config("spark.sql.warehouse.dir", "hdfs://ip-172-31-43-209.eu-west-1.compute.internal:8020/user/spark/warehouse") \
    .enableHiveSupport() \
    .getOrCreate()

In [75]:
spark.sql("show databases").show()

+------------+
|databaseName|
+------------+
|     default|
+------------+



In [76]:
spark.sql("use default").show()

++
||
++
++



In [77]:
spark.sql("show tables").show()

+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
+--------+---------+-----------+



In [46]:
spark.sparkContext.getConf().getAll()

[('spark.driver.port', '36025'),
 ('spark.executor.id', 'driver'),
 ('spark.app.name', 'pyspark-shell'),
 ('spark.driver.host', 'ip-172-16-10-201.eu-west-1.compute.internal'),
 ('spark.sql.catalogImplementation', 'hive'),
 ('spark.rdd.compress', 'True'),
 ('spark.driver.extraClassPath',
  '/home/ec2-user/anaconda3/envs/python3/lib/python3.6/site-packages/sagemaker_pyspark/jars/aws-java-sdk-sts-1.11.835.jar:/home/ec2-user/anaconda3/envs/python3/lib/python3.6/site-packages/sagemaker_pyspark/jars/hadoop-aws-2.8.1.jar:/home/ec2-user/anaconda3/envs/python3/lib/python3.6/site-packages/sagemaker_pyspark/jars/aws-java-sdk-kms-1.11.835.jar:/home/ec2-user/anaconda3/envs/python3/lib/python3.6/site-packages/sagemaker_pyspark/jars/hadoop-common-2.8.1.jar:/home/ec2-user/anaconda3/envs/python3/lib/python3.6/site-packages/sagemaker_pyspark/jars/aws-java-sdk-sagemaker-1.11.835.jar:/home/ec2-user/anaconda3/envs/python3/lib/python3.6/site-packages/sagemaker_pyspark/jars/hadoop-auth-2.8.1.jar:/home/ec2-us