In [7]:
import pymongo

MONGO_URI = "mongodb+srv://leogym2:NPgwIUneOWWyNuMo@cluster0.mf0xa.mongodb.net/"
client = pymongo.MongoClient(MONGO_URI)
db = client["Raw_data"]

# create index if not done alreasdy to improve lookup


#db["Stock_performance"].create_index([("symbol", 1), ("date", 1)])
#db["Insider_Trading_cleaned"].create_index([("Company", 1), ("Date_transactions", 1)])

In [8]:
pipeline = [
    {
        "$lookup": {
            "from": "Insider_Trading_cleaned",
            "let": { 
                "stock_symbol": "$symbol", 
                "stock_date": "$date"
            },
            "pipeline": [
                {
                    "$match": {
                        "$expr": {
                            "$and": [
                                { "$eq": ["$Company", "$$stock_symbol"] },   # set the join condition
                                { "$eq": ["$Date_transactions", "$$stock_date"] }
                            ]
                        }
                    }
                },
                {
                    "$project": {
                        "_id": 0,
                        "Date_transactions": 1,
                        "filtered_nonDerivativeTransaction": 1,  # this array of trnasaction for each person as stored in the raw data
                        "Entity_name_cleaned": 1,
                        "isDirector": 1,
                        "isOfficer": 1,
                        "isTenPercentOwner": 1,
                        "isOther": 1,
                        "officerTitle": 1,
                        "otherText": 1
                    }
                }
            ],
            "as": "transactions_from_lookup"  # Use a different alias for clarity
        }
    },
    {
        "$addFields": {
            "transactions_from_lookup": {
                "$map": { #created inside each element of the transactions_from_lookup arrays
                    "input": "$transactions_from_lookup",
                    "as": "transaction",
                    "in": {
                        "$mergeObjects": [
                            "$$transaction",   # merge with the original transactions with new crated field 
                            { "total_entity_transactions": { "$size": { "$ifNull": [ "$$transaction.filtered_nonDerivativeTransaction", [] ] } } },
                            {   
                                "count_acquired": {
                                    "$size": {
                                        "$filter": {
                                            "input": { "$ifNull": [ "$$transaction.filtered_nonDerivativeTransaction", [] ] },
                                            "as": "txn",
                                            "cond": { "$eq": ["$$txn.transactionAmounts.transactionAcquiredDisposedCode.value", "A"] }
                                        }
                                    }
                                }
                            },
                            {
                                "count_disposed": {
                                    "$size": {
                                        "$filter": {
                                            "input": { "$ifNull": [ "$$transaction.filtered_nonDerivativeTransaction", [] ] },
                                            "as": "txn",
                                            "cond": { "$eq": ["$$txn.transactionAmounts.transactionAcquiredDisposedCode.value", "D"] }
                                        }
                                    }
                                }
                            }
                        ]
                    }
                }
            },#created outside the arrays at top level
            "total_transactions": {
                "$cond": {
                    "if": { "$gt": [{ "$size": "$transactions_from_lookup" }, 0] },  # Check if array is not empty
                    "then": {
                        "$reduce": {
                            "input": "$transactions_from_lookup",  # Use the new aliasand and iterate over this array
                            "initialValue": 0,
                            "in": { # define The calculation for each step of iteration.
                                "$add": [
                                    "$$value", 
                                    { 
                                        "$size": { 
                                            "$ifNull": [ "$$this.filtered_nonDerivativeTransaction", [] ] # count "sub" aray sizes and sum them up
                                        }
                                    }  
                                ]
                            }
                        }
                    },
                    "else": 0  # If the array is empty, set total_transactions to 0
                }
            }
        }
    }
]

result = list(db["Stock_performance"].aggregate(pipeline))
print(len(result))  # Check how many documents are in the result
print(result[900:1200])  # View a sample of the result


62888
[{'_id': ObjectId('67b34c5ee1025b242fd24cda'), 'symbol': 'AAPL', 'date': datetime.datetime(2021, 7, 16, 0, 0), 'close': 146.39, 'open': 148.46, 'high': 149.76, 'low': 145.88, 'volume': 93251426, 'change_close_next_1_day': None, 'change_close_next_5_days': 1.48, 'change_close_next_15_days': -0.17, 'change_volume_next_1_day': None, 'change_volume_next_5_days': -23.38, 'change_volume_next_15_days': -41.96, 'transactions_from_lookup': [], 'total_transactions': 0}, {'_id': ObjectId('67b34c5ee1025b242fd24cdb'), 'symbol': 'AAPL', 'date': datetime.datetime(2021, 7, 15, 0, 0), 'close': 148.48, 'open': 149.24, 'high': 150.0, 'low': 147.09, 'volume': 106820297, 'change_close_next_1_day': -1.41, 'change_close_next_5_days': -1.13, 'change_close_next_15_days': -0.96, 'change_volume_next_1_day': -12.7, 'change_volume_next_5_days': -27.6, 'change_volume_next_15_days': -56.56, 'transactions_from_lookup': [], 'total_transactions': 0}, {'_id': ObjectId('67b34c5ee1025b242fd24cdc'), 'symbol': 'AAPL',

In [9]:
# Execute the aggregation
import time 

from pymongo import InsertOne

STOCK_Insider_Trading = db["Stock_InsiderTrading_integrated"]


# Prepare a list for bulk insert operations
bulk_operations = []
BATCH_SIZE = 800  # Adjust as needed

for i, doc in enumerate(result, 1):
    bulk_operations.append(InsertOne(doc))

    # Insert data in batches
    if i % BATCH_SIZE == 0:
        STOCK_Insider_Trading.bulk_write(bulk_operations)
        bulk_operations.clear()  # Clear the batch after writing
        time.sleep(0.5)  # Pause before next batch


# Insert any remaining documents
if bulk_operations:
    STOCK_Insider_Trading.bulk_write(bulk_operations)




In [10]:
db["Stock_InsiderTrading_integrated"].create_index([("symbol", 1), ("date", 1)])

'symbol_1_date_1'