In [None]:
import json
import boto3
import urllib3
import datetime

# Replace with your S3 bucket and Firehose details
S3_BUCKET_NAME = 'myjobsdata'
FIREHOSE_NAME = 'PUT-S3-JcmCd'

# API details
API_URL = 'https://api.coresignal.com/cdapi/v1/professional_network/job/collect/'
API_KEY = 'eyJhbGciOiJFZERTQSIsImtpZCI6IjFiMDI1YjU5LTg3ZGMtZjJmOC0yMGRkLWNmMGE0OWVhMzMzYiJ9.eyJhdWQiOiJicm9hZGNvbS5jb20iLCJleHAiOjE3NTU4NTkxNTEsImlhdCI6MTcyNDMwMjE5OSwiaXNzIjoiaHR0cHM6Ly9vcHMuY29yZXNpZ25hbC5jb206ODMwMC92MS9pZGVudGl0eS9vaWRjIiwibmFtZXNwYWNlIjoicm9vdCIsInByZWZlcnJlZF91c2VybmFtZSI6ImJyb2FkY29tLmNvbSIsInN1YiI6ImZhMGM0YzljLWMyMWMtZmZkZi1jMGI5LTQ4YWVkNWFmOWMxNiIsInVzZXJpbmZvIjp7InNjb3BlcyI6ImNkYXBpIn19.-2ENp5lyRRTOtTwVw6fc_vm6yInK7_vWK89Mz4raPPLJCg7Zr3CK39NikarA4Cw6QhBR8cn8_Kyubo_edi_jAg'

def lambda_handler(event, context):
    s3_client = boto3.client('s3')
    http = urllib3.PoolManager()
    firehose_client = boto3.client('firehose')

    try:
        # Step 1: Get the list of objects in the S3 bucket
        response = s3_client.list_objects_v2(Bucket=S3_BUCKET_NAME)
        if 'Contents' not in response:
            raise ValueError(f"No files found in S3 bucket: {S3_BUCKET_NAME}")

        for obj in response['Contents']:
            # Step 2: Download the file from S3
            file_data = s3_client.get_object(Bucket=S3_BUCKET_NAME, Key=obj['Key'])
            raw_content = file_data['Body'].read().decode('utf-8')

            # Step 3: Parse the file line by line (handle JSON objects and integers)
            job_ids = []
            for line in raw_content.splitlines():
                try:
                    parsed_line = json.loads(line.strip())
                    if isinstance(parsed_line, dict) and 'value' in parsed_line:
                        job_ids.append(parsed_line['value'])  # Extract job ID from "value"
                    elif isinstance(parsed_line, int):
                        job_ids.append(parsed_line)  # Directly add integers
                    else:
                        print(f"Skipping invalid job ID: {line.strip()}")
                except json.JSONDecodeError as e:
                    print(f"Skipping invalid JSON line: {line.strip()}, Error: {e}")

            if not job_ids:
                raise ValueError(f"No valid job IDs found in file {obj['Key']}")

            # Step 4: Fetch job details for each job ID
            for job_id in job_ids:
                api_response = fetch_job_details(http, job_id)
                if api_response:
                    # Step 5: Send job details to Firehose
                    send_to_firehose(firehose_client, api_response)

        return {
            'statusCode': 200,
            'body': json.dumps({'message': 'Job details successfully processed and sent to Firehose'})
        }

    except Exception as e:
        return {
            'statusCode': 500,
            'body': f"An error occurred: {str(e)}"
        }


def fetch_job_details(http, job_id):
    """
    Fetch job details from the API
    """
    try:
        headers = {
            'Authorization': f'Bearer {API_KEY}',
            'Content-Type': 'application/json'
        }
        response = http.request(
            'GET',
            f"{API_URL}{job_id}",
            headers=headers
        )

        if response.status == 200:
            data = json.loads(response.data.decode('utf-8'))
            data['job_id'] = job_id
            data['timestamp'] = datetime.datetime.utcnow().isoformat()
            return data
        else:
            print(f"Failed to fetch details for Job ID {job_id}: {response.status}")
            return None

    except Exception as e:
        print(f"Error fetching job details for Job ID {job_id}: {str(e)}")
        return None


def send_to_firehose(client, record):
    """
    Send data to Firehose
    """
    client.put_record(
        DeliveryStreamName=FIREHOSE_NAME,
        Record={
            'Data': json.dumps(record) + '\n'  # Firehose requires newline-delimited JSON
        }
    )