From 0b2494d3afdb2bf47f0584545e00545958c9a9b6 Mon Sep 17 00:00:00 2001 From: Vaisakh K M Date: Sat, 15 Nov 2025 08:00:01 +0530 Subject: [PATCH] fix: Girl's don't DM back algorithm issue. --- .../scalding/InterestedInFromKnownFor.scala | 345 ++++++++++++++++-- 1 file changed, 321 insertions(+), 24 deletions(-) diff --git a/src/scala/com/twitter/simclusters_v2/scalding/InterestedInFromKnownFor.scala b/src/scala/com/twitter/simclusters_v2/scalding/InterestedInFromKnownFor.scala index ab2cbde2d..f555af4c0 100644 --- a/src/scala/com/twitter/simclusters_v2/scalding/InterestedInFromKnownFor.scala +++ b/src/scala/com/twitter/simclusters_v2/scalding/InterestedInFromKnownFor.scala @@ -85,6 +85,33 @@ trait InterestedInFromKnownForBatchBase extends TwitterScheduledExecutionApp { val socialProofThreshold = args.int("socialProofThreshold", 2) val maxClustersPerUser = args.int("maxClustersPerUser", 50) + // Use the standard run method, or runWithTopicAlignment for topic-aligned boosting + // To enable topic alignment: User B will receive more recommendations from User A + // for topics that are aligned between both users. + // Example usage with topic alignment and DM (Direct Message) support: + // val previousInterestedIn = DAL.readMostRecentSnapshot(outputKVDataset, dateRange.prepend(lookBackDays)) + // .toTypedPipe.map { case KeyVal(user, interestedIn) => (user, interestedIn) } + // // Extract DM data from Interaction Graph + // import com.twitter.interaction_graph.hdfs_sources.InteractionGraphHistoryAggregatedEdgeSnapshotScalaDataset + // val interactionGraphEdges = DAL.readMostRecentSnapshot( + // InteractionGraphHistoryAggregatedEdgeSnapshotScalaDataset, + // dateRange + // ).toTypedPipe + // val dmData = InterestedInFromKnownFor.extractDirectMessageData( + // interactionGraphEdges, + // useEwma = true, // Use exponentially weighted moving average for recent DM interactions + // minScore = 0.1 // Minimum DM score threshold + // ) + // val result = InterestedInFromKnownFor.runWithTopicAlignment( + // normalizedGraph, + // knownFor, + // socialProofThreshold, + // maxClustersPerUser, + // knownForModelVersion, + // topicAlignmentBoost = 1.5, // 1.5x boost for aligned topics + // existingInterestedIn = Some(previousInterestedIn), + // messagingData = Some(dmData) // Include DM data for topic-aligned boosting + // ) val result = InterestedInFromKnownFor .run( normalizedGraph, @@ -188,6 +215,75 @@ object DumpInterestedInAdhoc extends TwitterExecutionApp { object InterestedInFromKnownFor { private def ifNanMake0(x: Double): Double = if (x.isNaN) 0.0 else x + /** + * Extract direct message (DM) interaction data from Interaction Graph edges. + * This function reads from the aggregated interaction graph dataset and extracts + * the num_direct_messages feature for each user pair. + * + * @param interactionGraphEdges TypedPipe of Edge from Interaction Graph. + * Each Edge contains a list of EdgeFeature objects. + * We look for EdgeFeature with FeatureName.NumDirectMessages. + * @param useEwma If true, uses exponentially weighted moving average (ewma) from TimeSeriesStatistics. + * If false, uses mean. Default true (ewma is better for recent interactions). + * @param minScore Minimum score threshold. Edges with scores below this will be filtered out. + * @return TypedPipe of (sourceUserId, destUserId, messageScore) tuples. + * Format matches the messagingData parameter expected by runWithTopicAlignment. + * + * Example usage: + * {{{ + * import com.twitter.interaction_graph.hdfs_sources.InteractionGraphHistoryAggregatedEdgeSnapshotScalaDataset + * val interactionGraphEdges = DAL.readMostRecentSnapshot( + * InteractionGraphHistoryAggregatedEdgeSnapshotScalaDataset, + * dateRange + * ).toTypedPipe + * val dmData = InterestedInFromKnownFor.extractDirectMessageData( + * interactionGraphEdges, + * useEwma = true, + * minScore = 0.1 + * ) + * }}} + */ + def extractDirectMessageData( + interactionGraphEdges: TypedPipe[com.twitter.interaction_graph.thriftscala.Edge], + useEwma: Boolean = true, + minScore: Double = 0.1 + )( + implicit uniqueId: UniqueID + ): TypedPipe[(UserId, UserId, Double)] = { + val numEdgesWithDM = Stat("num_edges_with_direct_messages") + val numEdgesWithoutDM = Stat("num_edges_without_direct_messages") + val numEdgesFilteredByMinScore = Stat("num_edges_filtered_by_min_score") + + interactionGraphEdges + .flatMap { edge => + // Find the EdgeFeature with FeatureName.NumDirectMessages + val dmFeature = edge.features.find(_.name == com.twitter.interaction_graph.thriftscala.FeatureName.NumDirectMessages) + + dmFeature match { + case Some(feature) => + numEdgesWithDM.inc() + // Extract score from TimeSeriesStatistics + // Use ewma (exponentially weighted moving average) for recent interactions, + // or mean for average over time + val score = if (useEwma) { + feature.tss.ewma + } else { + feature.tss.mean + } + + if (score >= minScore) { + Some((edge.sourceId, edge.destinationId, score)) + } else { + numEdgesFilteredByMinScore.inc() + None + } + case None => + numEdgesWithoutDM.inc() + None + } + } + } + case class SrcClusterIntermediateInfo( followScore: Double, followScoreProducerNormalized: Double, @@ -195,20 +291,26 @@ object InterestedInFromKnownFor { favScoreProducerNormalized: Double, logFavScore: Double, logFavScoreProducerNormalized: Double, + messageScore: Double, + messageScoreProducerNormalized: Double, followSocialProof: List[Long], - favSocialProof: List[Long]) { + favSocialProof: List[Long], + messageSocialProof: List[Long]) { // overriding for the sake of unit tests override def equals(obj: scala.Any): Boolean = { obj match { case that: SrcClusterIntermediateInfo => math.abs(followScore - that.followScore) < 1e-5 && - math.abs(followScoreProducerNormalized - that.followScoreProducerNormalized) < 1e-5 && - math.abs(favScore - that.favScore) < 1e-5 && - math.abs(favScoreProducerNormalized - that.favScoreProducerNormalized) < 1e-5 && - math.abs(logFavScore - that.logFavScore) < 1e-5 && - math.abs(logFavScoreProducerNormalized - that.logFavScoreProducerNormalized) < 1e-5 && - followSocialProof.toSet == that.followSocialProof.toSet && - favSocialProof.toSet == that.favSocialProof.toSet + math.abs(followScoreProducerNormalized - that.followScoreProducerNormalized) < 1e-5 && + math.abs(favScore - that.favScore) < 1e-5 && + math.abs(favScoreProducerNormalized - that.favScoreProducerNormalized) < 1e-5 && + math.abs(logFavScore - that.logFavScore) < 1e-5 && + math.abs(logFavScoreProducerNormalized - that.logFavScoreProducerNormalized) < 1e-5 && + math.abs(messageScore - that.messageScore) < 1e-5 && + math.abs(messageScoreProducerNormalized - that.messageScoreProducerNormalized) < 1e-5 && + followSocialProof.toSet == that.followSocialProof.toSet && + favSocialProof.toSet == that.favSocialProof.toSet && + messageSocialProof.toSet == that.messageSocialProof.toSet case _ => false } } @@ -230,9 +332,13 @@ object InterestedInFromKnownFor { logFavScore = left.logFavScore + right.logFavScore, logFavScoreProducerNormalized = left.logFavScoreProducerNormalized + right.logFavScoreProducerNormalized, + messageScore = left.messageScore + right.messageScore, + messageScoreProducerNormalized = + left.messageScoreProducerNormalized + right.messageScoreProducerNormalized, followSocialProof = Semigroup.plus(left.followSocialProof, right.followSocialProof).distinct, - favSocialProof = Semigroup.plus(left.favSocialProof, right.favSocialProof).distinct + favSocialProof = Semigroup.plus(left.favSocialProof, right.favSocialProof).distinct, + messageSocialProof = Semigroup.plus(left.messageSocialProof, right.messageSocialProof).distinct ) } } @@ -252,6 +358,47 @@ object InterestedInFromKnownFor { socialProofThreshold: Int )( implicit uniqueId: UniqueID + ): TypedPipe[((Long, Int), SrcClusterIntermediateInfo)] = { + userClusterPairsWithoutNormalizationWithTopicAlignment( + adjacencyLists, + knownFor, + socialProofThreshold, + topicAlignmentBoost = 1.0, + existingInterestedIn = None + ) + } + + /** + * Enhanced version that supports topic-aligned boosting. + * When User B follows/favorites/messages User A, this boosts clusters that are aligned between both users. + * + * @param adjacencyLists User-User follow/fav graph + * @param knownFor KnownFor data set. Each user can be known for several clusters with certain + * knownFor weights. + * @param socialProofThreshold A user will only be interested in a cluster if they follow/fav/message at + * least certain number of users known for this cluster. + * @param topicAlignmentBoost Multiplier to boost scores for clusters that are aligned between + * User A (producer) and User B (consumer). Default 1.0 (no boost). + * Set to > 1.0 to increase recommendations from User A to User B + * for aligned topics. + * @param existingInterestedIn Optional existing InterestedIn embeddings. If provided, clusters + * that exist in both the producer's KnownFor and consumer's existing + * InterestedIn will be boosted by topicAlignmentBoost. + * @param messagingData Optional messaging interaction data. Format: (sourceUserId, destUserId, messageScore). + * If provided, when User A messages User B, topic-aligned boosting will be applied + * similar to follows/favorites. Message scores should be normalized (e.g., decayed counts). + * @param uniqueId required for these Stat + * @return + */ + def userClusterPairsWithoutNormalizationWithTopicAlignment( + adjacencyLists: TypedPipe[UserAndNeighbors], + knownFor: TypedPipe[(Long, Array[(Int, Float)])], + socialProofThreshold: Int, + topicAlignmentBoost: Double = 1.0, + existingInterestedIn: Option[TypedPipe[(UserId, ClustersUserIsInterestedIn)]] = None, + messagingData: Option[TypedPipe[(UserId, UserId, Double)]] = None + )( + implicit uniqueId: UniqueID ): TypedPipe[((Long, Int), SrcClusterIntermediateInfo)] = { val edgesToUsersWithKnownFor = Stat("num_edges_to_users_with_known_for") val srcDestClusterTriples = Stat("num_src_dest_cluster_triples") @@ -270,35 +417,122 @@ object InterestedInFromKnownFor { } } + // Join messaging data if provided + // Format: (destUserId, (srcUserId, messageScore)) + val messagingEdges = messagingData match { + case Some(messages) => + messages.map { case (srcId, destId, score) => (destId, (srcId, score)) } + case None => + TypedPipe.empty[(UserId, (UserId, Double))] + } + + // Build a map of consumer (User B) -> set of cluster IDs they're interested in + // This is used to identify topic alignment between User A (producer) and User B (consumer) + val consumerInterestedInClusters = existingInterestedIn match { + case Some(interestedIn) => + interestedIn.flatMap { + case (userId, clusters) => + clusters.clusterIdToScores.keys.map(clusterId => (userId, clusterId)) + } + .group + .toSet + .toTypedPipe + case None => + TypedPipe.empty[(UserId, Set[ClusterId])] + } + implicit val l2b: Long => Array[Byte] = Injection.long2BigEndian - edges + val edgesWithKnownFor = edges .sketch(4000) .join(knownFor) + + // Join edges with messaging data + val edgesWithMessaging = edgesWithKnownFor + .map { case (destId, (srcWithWeights, clusterArray)) => + (destId, (srcWithWeights.neighborId, srcWithWeights, clusterArray)) + } + .leftJoin(messagingEdges) + .map { + case (destId, ((srcId, srcWithWeights, clusterArray), messagingOpt)) => + (destId, (srcId, srcWithWeights, clusterArray, messagingOpt)) + } + + // Join with consumer's existing InterestedIn clusters to check for topic alignment + val edgesWithAlignment = if (existingInterestedIn.isDefined && topicAlignmentBoost > 1.0) { + edgesWithMessaging + .map { case (destId, (srcId, srcWithWeights, clusterArray, messagingOpt)) => + (srcId, (destId, srcWithWeights, clusterArray, messagingOpt)) + } + .leftJoin(consumerInterestedInClusters) + .map { + case (srcId, ((destId, srcWithWeights, clusterArray, messagingOpt), consumerClustersOpt)) => + (destId, (srcWithWeights, clusterArray, consumerClustersOpt.getOrElse(Set.empty[ClusterId]), messagingOpt)) + } + } else { + edgesWithMessaging.map { + case (destId, (srcId, srcWithWeights, clusterArray, messagingOpt)) => + (destId, (srcWithWeights, clusterArray, Set.empty[ClusterId], messagingOpt)) + } + } + + edgesWithAlignment .flatMap { - case (destId, (srcWithWeights, clusterArray)) => + case (destId, (srcWithWeights, clusterArray, consumerClusters, messagingOpt)) => edgesToUsersWithKnownFor.inc() clusterArray.toList.map { case (clusterId, knownForScoreF) => val knownForScore = math.max(0.0, knownForScoreF.toDouble) + // Check if this cluster is aligned between User A (producer) and User B (consumer) + // Cluster is aligned if it exists in both: + // 1. User A's KnownFor clusters (this clusterId) + // 2. User B's existing InterestedIn clusters (consumerClusters) + val isAligned = consumerClusters.contains(clusterId) + val alignmentMultiplier = if (isAligned && topicAlignmentBoost > 1.0) { + topicAlignmentBoost + } else { + 1.0 + } + + // Get messaging score if available + val (messageScore, messageScoreProducerNormalized, messageSocialProof) = messagingOpt match { + case Some((srcId, msgScore)) if msgScore > 0.0 => + // Apply topic alignment boost for messaging if clusters are aligned + val baseMessageScore = msgScore * knownForScore + val boostedMessageScore = baseMessageScore * alignmentMultiplier + // For producer normalization, we use a simple normalization (can be enhanced) + val normalizedMessageScore = boostedMessageScore * 0.1 // Simple normalization factor + (boostedMessageScore, normalizedMessageScore, List(destId)) + case _ => + (0.0, 0.0, Nil) + } + srcDestClusterTriples.inc() - val followScore = + val baseFollowScore = if (srcWithWeights.isFollowed.contains(true)) knownForScore else 0.0 - val followScoreProducerNormalizedOnly = + val followScore = baseFollowScore * alignmentMultiplier + + val baseFollowScoreProducerNormalizedOnly = srcWithWeights.followScoreNormalizedByNeighborFollowersL2.getOrElse( 0.0) * knownForScore - val favScore = + val followScoreProducerNormalizedOnly = baseFollowScoreProducerNormalizedOnly * alignmentMultiplier + + val baseFavScore = srcWithWeights.favScoreHalfLife100Days.getOrElse(0.0) * knownForScore + val favScore = baseFavScore * alignmentMultiplier - val favScoreProducerNormalizedOnly = + val baseFavScoreProducerNormalizedOnly = srcWithWeights.favScoreHalfLife100DaysNormalizedByNeighborFaversL2.getOrElse( 0.0) * knownForScore + val favScoreProducerNormalizedOnly = baseFavScoreProducerNormalizedOnly * alignmentMultiplier - val logFavScore = srcWithWeights.logFavScore.getOrElse(0.0) * knownForScore + val baseLogFavScore = srcWithWeights.logFavScore.getOrElse(0.0) * knownForScore + val logFavScore = baseLogFavScore * alignmentMultiplier - val logFavScoreProducerNormalizedOnly = srcWithWeights.logFavScoreL2Normalized + val baseLogFavScoreProducerNormalizedOnly = srcWithWeights.logFavScoreL2Normalized .getOrElse(0.0) * knownForScore + val logFavScoreProducerNormalizedOnly = baseLogFavScoreProducerNormalizedOnly * alignmentMultiplier val followSocialProof = if (srcWithWeights.isFollowed.contains(true)) { List(destId) @@ -316,8 +550,11 @@ object InterestedInFromKnownFor { favScoreProducerNormalizedOnly, logFavScore, logFavScoreProducerNormalizedOnly, + messageScore, + messageScoreProducerNormalized, followSocialProof, - favSocialProof + favSocialProof, + messageSocialProof ) ) } @@ -325,9 +562,9 @@ object InterestedInFromKnownFor { .sumByKey .withReducers(10000) .filter { - case ((_, _), SrcClusterIntermediateInfo(_, _, _, _, _, _, followProof, favProof)) => + case ((_, _), SrcClusterIntermediateInfo(_, _, _, _, _, _, _, _, followProof, favProof, messageProof)) => srcClusterPairsBeforeSocialProofThresholding.inc() - val distinctSocialProof = (followProof ++ favProof).toSet + val distinctSocialProof = (followProof ++ favProof ++ messageProof).toSet val result = distinctSocialProof.size >= socialProofThreshold if (result) { srcClusterPairsAfterSocialProofThresholding.inc() @@ -359,6 +596,9 @@ object InterestedInFromKnownFor { favScoreProducerNormalizedOnly, logFavScore, logFavScoreProducerNormalizedOnly, + messageScore, + messageScoreProducerNormalized, + _, _, _ ) @@ -372,7 +612,9 @@ object InterestedInFromKnownFor { square(favScore), square(favScoreProducerNormalizedOnly), square(logFavScore), - square(logFavScoreProducerNormalizedOnly) + square(logFavScoreProducerNormalizedOnly), + square(messageScore), + square(messageScoreProducerNormalized) ) ) } @@ -425,8 +667,11 @@ object InterestedInFromKnownFor { favScoreProducerNormalizedOnly, logFavScore, logFavScoreProducerNormalizedOnly, // not used for now + messageScore, + messageScoreProducerNormalized, followProof, - favProof + favProof, + messageProof ) ), ( @@ -436,7 +681,9 @@ object InterestedInFromKnownFor { favNorm, favProducerNormalizedNorm, logFavNorm, - logFavProducerNormalizedNorm // not used for now + logFavProducerNormalizedNorm, // not used for now + messageNorm, + messageProducerNormalizedNorm ) ) ) => @@ -492,8 +739,11 @@ object InterestedInFromKnownFor { favScoreProducerNormalizedOnly, logFavScore, logFavScoreProducerNormalizedOnly, + messageScore, + messageScoreProducerNormalized, followProof, - favProof + favProof, + messageProof ) ) => ( @@ -589,6 +839,53 @@ object InterestedInFromKnownFor { ) } + /** + * Enhanced run method that supports topic-aligned boosting. + * When User B follows/favorites User A, this boosts clusters that are aligned between both users, + * resulting in User B receiving more recommendations from User A for aligned topics. + * + * @param adjacencyLists User-User follow/fav graph + * @param knownFor KnownFor data set + * @param socialProofThreshold Minimum number of users to follow/fav for a cluster to be considered + * @param maxClustersPerUser Maximum number of clusters to keep per user + * @param knownForModelVersion Model version string + * @param topicAlignmentBoost Multiplier to boost scores for aligned clusters (default 1.0 = no boost). + * Recommended values: 1.5-2.0 for moderate boost, 2.0-3.0 for strong boost. + * @param existingInterestedIn Optional existing InterestedIn embeddings. If provided, clusters + * that exist in both the producer's KnownFor and consumer's existing + * InterestedIn will be boosted. This enables iterative refinement where + * you can use a previous run's output as input for alignment. + * @param uniqueId required for these Stat + * @return + */ + def runWithTopicAlignment( + adjacencyLists: TypedPipe[UserAndNeighbors], + knownFor: TypedPipe[(UserId, Array[(ClusterId, Float)])], + socialProofThreshold: Int, + maxClustersPerUser: Int, + knownForModelVersion: String, + topicAlignmentBoost: Double = 1.5, + existingInterestedIn: Option[TypedPipe[(UserId, ClustersUserIsInterestedIn)]] = None, + messagingData: Option[TypedPipe[(UserId, UserId, Double)]] = None + )( + implicit uniqueId: UniqueID + ): TypedPipe[(UserId, ClustersUserIsInterestedIn)] = { + keepOnlyTopClusters( + attachNormalizedScores( + userClusterPairsWithoutNormalizationWithTopicAlignment( + adjacencyLists, + knownFor, + socialProofThreshold, + topicAlignmentBoost, + existingInterestedIn, + messagingData + ) + ), + maxClustersPerUser, + knownForModelVersion + ) + } + /** * run the interestedIn job, cluster normalized scores are not attached to user's clusters. */