In [1]:
from pyspark.sql import SparkSession

jars = [
    '/usr/share/aws/redshift/jdbc/RedshiftJDBC.jar',
    '/usr/share/aws/redshift/spark-redshift/lib/spark-redshift.jar',
    '/usr/share/aws/redshift/spark-redshift/lib/spark-avro.jar',
    '/usr/share/aws/redshift/spark-redshift/lib/minimal-json.jar'
]

spark = SparkSession. \
    builder. \
    appName('Redshift Integration'). \
    master('yarn'). \
    config('spark.jars', ','.join(jars)). \
    getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/07/14 12:55:08 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.


In [2]:
ghactivity = spark.read.json('s3://aigithub/landing/ghactivity/2022-06-19*')
ghactivity.createOrReplaceTempView('ghactivity')
new_repos = spark.sql("""
    SELECT
        repo.id AS repo_id,
        repo.name AS repo_name,
        actor.id AS actor_id,
        actor.login AS actor_login,
        actor.display_login AS actor_display_login,
        payload.ref_type AS ref_type,
        type,
        created_at,
        year(created_at) AS created_year,
        month(created_at) AS created_month,
        dayofmonth(created_at) AS created_dayofmonth
    FROM ghactivity
    WHERE type = 'CreateEvent'
        AND payload.ref_type = 'repository'
""")

22/07/14 12:56:18 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


In [3]:
new_repos.show()

                                                                                

+---------+--------------------+---------+------------------+-------------------+----------+-----------+--------------------+------------+-------------+------------------+
|  repo_id|           repo_name| actor_id|       actor_login|actor_display_login|  ref_type|       type|          created_at|created_year|created_month|created_dayofmonth|
+---------+--------------------+---------+------------------+-------------------+----------+-----------+--------------------+------------+-------------+------------------+
|505110810|   Anand4311/Project|107799449|         Anand4311|          Anand4311|repository|CreateEvent|2022-06-19T13:00:01Z|        2022|            6|                19|
|505110809|direwolf-github/e...| 10810283|   direwolf-github|    direwolf-github|repository|CreateEvent|2022-06-19T13:00:01Z|        2022|            6|                19|
|505110811|shakil033/e-comme...| 61247278|         shakil033|          shakil033|repository|CreateEvent|2022-06-19T13:00:01Z|        2022|  

In [4]:
new_repos.printSchema()

root
 |-- repo_id: long (nullable = true)
 |-- repo_name: string (nullable = true)
 |-- actor_id: long (nullable = true)
 |-- actor_login: string (nullable = true)
 |-- actor_display_login: string (nullable = true)
 |-- ref_type: string (nullable = true)
 |-- type: string (nullable = true)
 |-- created_at: string (nullable = true)
 |-- created_year: integer (nullable = true)
 |-- created_month: integer (nullable = true)
 |-- created_dayofmonth: integer (nullable = true)



In [5]:
new_repos.count()

                                                                                

122496

```python
# Original logic to write to files
new_repos. \
  write. \
  partitionBy('created_year', 'created_month', 'created_dayofmonth'). \
  mode('overwrite'). \
  parquet('s3://aigithub/github_dm/new_repos')
```

* Make sure the required permissions are granted on s3 to Redshift Cluster via IAM role.
* Come up with the logic to write dataframe to Redshift Table.

In [6]:
import boto3
import json
sm_client = boto3.client('secretsmanager')
secret_value = sm_client.get_secret_value(SecretId='demo/aigithub/redshift')
credentials = json.loads(secret_value['SecretString'])

username = credentials['username']
password = credentials['password']
host = credentials['host']
port = credentials['port']
database = 'github_dm'
url = f"jdbc:redshift://{host}:{port}/{database}?user={username}&password={password}"

In [7]:
new_repos.count()

                                                                                

122496

In [8]:
!aws s3 rm s3://aigithub/temp/ghrepos --recursive

delete: s3://aigithub/temp/ghrepos/6e045934-9e62-4c11-9090-07c78c44976e/_SUCCESS
delete: s3://aigithub/temp/ghrepos/6e045934-9e62-4c11-9090-07c78c44976e/part-00001-12071e9d-3248-4bda-a46e-8793e06ae0fc-c000.avro
delete: s3://aigithub/temp/ghrepos/6e045934-9e62-4c11-9090-07c78c44976e/part-00006-12071e9d-3248-4bda-a46e-8793e06ae0fc-c000.avro
delete: s3://aigithub/temp/ghrepos/6e045934-9e62-4c11-9090-07c78c44976e/part-00004-12071e9d-3248-4bda-a46e-8793e06ae0fc-c000.avro
delete: s3://aigithub/temp/ghrepos/6e045934-9e62-4c11-9090-07c78c44976e/manifest.json
delete: s3://aigithub/temp/ghrepos/6e045934-9e62-4c11-9090-07c78c44976e/part-00003-12071e9d-3248-4bda-a46e-8793e06ae0fc-c000.avro
delete: s3://aigithub/temp/ghrepos/6e045934-9e62-4c11-9090-07c78c44976e/part-00007-12071e9d-3248-4bda-a46e-8793e06ae0fc-c000.avro
delete: s3://aigithub/temp/ghrepos/6e045934-9e62-4c11-9090-07c78c44976e/part-00002-12071e9d-3248-4bda-a46e-8793e06ae0fc-c000.avro
delete: s3://aigithub/temp/ghrepos/6e045934-9e62-4c11

In [12]:
new_repos. \
    write. \
    mode('append'). \
    format('io.github.spark_redshift_community.spark.redshift'). \
    option(
        'aws_iam_role', 
        'arn:aws:iam::269066542444:role/service-role/AmazonRedshift-CommandsAccessRole-20220625T110940'
    ). \
    option('url', url). \
    option('dbtable', 'public.ghrepos'). \
    option('tempdir', 's3://aigithub/temp/ghrepos'). \
    save()

22/07/14 13:03:30 WARN CredentialsLegacyConfigLocationProvider: Found the legacy config profiles file at [/home/hadoop/.aws/config]. Please move it to the latest default location [~/.aws/credentials].
22/07/14 13:03:30 WARN Utils$: The S3 bucket aigithub does not have an object lifecycle configuration to ensure cleanup of temporary files. Consider configuring `tempdir` to point to a bucket with an object lifecycle policy that automatically deletes files after an expiration period. For more information, see https://docs.aws.amazon.com/AmazonS3/latest/dev/object-lifecycle-mgmt.html
                                                                                

In [13]:
!aws s3 ls s3://aigithub/temp/ghrepos --recursive|wc -l

22


In [11]:
!aws s3 ls s3://aigithub/temp/ghrepos --recursive

2022-07-14 12:57:45          0 temp/ghrepos/141aa32f-a7d2-4b89-b48a-123cdca01005/_SUCCESS
2022-07-14 12:57:45       1372 temp/ghrepos/141aa32f-a7d2-4b89-b48a-123cdca01005/manifest.json
2022-07-14 12:57:25     600396 temp/ghrepos/141aa32f-a7d2-4b89-b48a-123cdca01005/part-00000-c2fd703c-3eb1-4168-8a97-00f16ab095ca-c000.avro
2022-07-14 12:57:25     618771 temp/ghrepos/141aa32f-a7d2-4b89-b48a-123cdca01005/part-00001-c2fd703c-3eb1-4168-8a97-00f16ab095ca-c000.avro
2022-07-14 12:57:29     580568 temp/ghrepos/141aa32f-a7d2-4b89-b48a-123cdca01005/part-00002-c2fd703c-3eb1-4168-8a97-00f16ab095ca-c000.avro
2022-07-14 12:57:29     489691 temp/ghrepos/141aa32f-a7d2-4b89-b48a-123cdca01005/part-00003-c2fd703c-3eb1-4168-8a97-00f16ab095ca-c000.avro
2022-07-14 12:57:34     630458 temp/ghrepos/141aa32f-a7d2-4b89-b48a-123cdca01005/part-00004-c2fd703c-3eb1-4168-8a97-00f16ab095ca-c000.avro
2022-07-14 12:57:34     684348 temp/ghrepos/141aa32f-a7d2-4b89-b48a-123cdca01005/part-00005-c2fd703c-3eb1-4168-8a97-00f1